
订单的超时取消很多系统采用定时任务,实际上达不到要求。我用的是延迟队列,但缺点是只实现了基于jvm的,分布式采用的是修改之前去查询订单状态,以及分布式锁获取的方式来控制,这样获得锁的,先去查订单是否已经取消掉,如果没有,就改成去掉。但这种方式虽然效果不错,我对这个半吊子的思路还是不满意的,后面有时间再优化一些,做成分布式的。
整个功能实际就是利用延迟队列的特性。延迟队列有个时间属性,一旦到达这个时间节点,会立即从队列中弹出任务,这时候,轮询守护线程就可以执行这个任务了。如果没有到达这个时间节点,无论轮询守护线程怎么轮询,都查不到对应的任务弹出。从jdk1.5开始,java就提供了Delayed接口,用于延迟队列。该接口是继承于Comparable<T>的。至于延迟队列里的任务是什么样子,则由自己定义,我为了便于多线程跑,使用了继承了Runnable的任务,所以定义成这个样子。
执行流程:
- 系统启动,初始化延迟队列。随即会启动守护线程,用于轮询延迟队列里的任务到期弹出。
- 将现有数据库中处于待支付状态的订单插入延迟队列。
- 一旦有新的订单加入,则放入延迟队列中。
- 一旦有任务到期,则守护线程会接到弹出的任务,并执行该任务。
- 该任务会从更新数据库中的订单状态。
- 一旦有用户取消订单/支付订单,则从延迟队列中查询到该订单的延迟任务,手动删除该任务。
这个是使用的spring boot的线程池的,所以需要配置一个线程池。
延迟队列代码:
package com.xxxx.xxxx.config.taskThreadPool;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* @Author: xiehao
* @Date: 2018-08-11
* @Description 延迟队列中放入的task,里面包含到时间之后要处理的任务,该任务类实现了Runnable,用于交给线程池管理
*/
public class DelayCancelOrderTask<T extends Runnable> implements Delayed {
/**
* 到期时间
*/
private final long time;
/**
* 任务对象
*/
private final T processor;
/**
* 原子类
*/
private static final AtomicLong atomic = new AtomicLong(0);
private final long sequence;
public DelayCancelOrderTask(long timeout, T processor){
this.time = System.nanoTime() + timeout;
this.processor = processor;
this.sequence = atomic.getAndIncrement();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == this){
return 0;
}
if (o instanceof DelayCancelOrderTask){
DelayCancelOrderTask<?> other = (DelayCancelOrderTask<?>)o;
long diff = this.time - other.time;
if (diff > 0) {
return 1;
} else if (diff < 0) {
return -1;
}else if(this.sequence < other.sequence){
return -1;
} else {
return 0;
}
}
long diffrent = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return (diffrent == 0)?0:((diffrent < 0) ? -1 : 1);
}
public T getProcessor(){
return this.processor;
}
@Override
public int hashCode(){
return processor.hashCode();
}
@Override
public boolean equals(Object object)
{
if (object != null)
{
return object.hashCode() == hashCode() ? true : false;
}
return false;
}
}
延迟队列中放置的任务:
package com.xxxx.xxxx.config.taskThreadPool;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import com.xxxx.xxxx.domain.vo.MonitorOrderVO;
import com.xxxx.xxxx.enums.EnumWorkOrderSource;
import com.xxxx.xxxx.utils.MessageProducer;
import org.springframework.scheduling.annotation.Async;
import com.xxxx.xxxx.domain.pojo.MonitorOrder;
import com.xxxx.xxxx.enums.EnumB2BOrderStatus;
import com.xxxx.xxxx.service.MonitorOrderService;
import com.xxxx.xxxx.utils.BeanFactoryGetter;
/**
* @Author: xiehao
* @Date: 2018-08-11
* @Description 超时取消的时间到了,这里要执行对订单的具体取消操作
*/
public class PayTimeoutCancelOrderProcessor implements Runnable {
private String b2bTaskNo;
public PayTimeoutCancelOrderProcessor(String b2bTaskNo){
this.b2bTaskNo = b2bTaskNo;
}
@Override
@Async
public void run(){
//因service无法注入,只能从bean工厂中拉取
MonitorOrderService monitorOrderService = (MonitorOrderService)BeanFactoryGetter.getBean("monitorOrderService");
MonitorOrder monitorOrder = new MonitorOrder();
monitorOrder.setB2bTaskNo(b2bTaskNo);
monitorOrder.setCurrentStatus(EnumB2BOrderStatus.OVERTIME_PAYMENT.getKey());
monitorOrder.setCurrentStatusTime(LocalDateTime.now());
monitorOrderService.uptCurrentStatusByTaskNoForOverTimeCancel(monitorOrder);
//推送直接用户订单消息。现在还没有推送系统正式上线,所以先暂时注释掉
}
}
延迟队列的管理类,有轮询守护线程:
package com.xxxx.xxxx.config.taskThreadPool;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* @Author: xiehao
* @Date: 2018-08-11
* @Description 超时取消的管理类,管理延迟队列,放出一个线程轮询监控延迟队列;有一些方法用来往延迟队列中压入task,有方法用来从队列中remove task。还有初始化方法,开始执行轮询监控线程
*/
@Component
public class DelayCancelOrderTaskManager {
private static final Logger logger = LoggerFactory.getLogger(DelayCancelOrderTaskManager.class);
//延迟队列,一切都是围绕它来运转的
private DelayQueue<DelayCancelOrderTask<?>> delayQueue;
//超时取消的管理类是个单例,因为系统启动要有方法往队列中插入延迟task,所以搞成饿汉模式
private static DelayCancelOrderTaskManager instance = new DelayCancelOrderTaskManager();
// 守护线程,用于轮询延时队列
private Thread daemonThread;
//该方法初始管理类时调用,初始化延迟队列,同时初始化轮询线程
private DelayCancelOrderTaskManager(){
delayQueue = new DelayQueue<DelayCancelOrderTask<?>>();
this.init();
}
public static DelayCancelOrderTaskManager getInstance(){
return instance;
}
//初始化轮询监控守护线程
public void init(){
//lambda表达式,->的意思其实是静态方法,初始化随即执行的。
daemonThread = new Thread(()-> {
try{
System.out.println("daemonThread start");
execute();
}catch (Exception e){
logger.error("轮询线程出错",e);
}
});
daemonThread.setName("DelayQueueMonitorThread");
daemonThread.start();
}
//初始化的时候开始执行
private void execute(){
//不断轮询
while(true){
//此处仅为打印日志方便
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
logger.info("线程数--------------" + map.size());
logger.info(System.currentTimeMillis()+" 队列中的个数:"+delayQueue.size());
try{
//从队列中取出可以取出的task。延时队列有个特点,不到时间取不出来,所以能取出来的,都是到时间即将执行的。
DelayCancelOrderTask<?> delayCancelOrderTask = delayQueue.take();
//task不为空,则开始执行
if(delayCancelOrderTask != null){
//获取task里面的处理线程,该线程会丢到线程池中处理
Runnable payTimeoutCancelOrderProcessor = delayCancelOrderTask.getProcessor();
if(payTimeoutCancelOrderProcessor == null){
continue;
}
//线程执行
payTimeoutCancelOrderProcessor.run();
//执行完毕,从队列中删除task
this.removeTask(delayCancelOrderTask);
}
}catch (Exception e){
logger.error("线程执行错误:",e);
}
}
}
/**
* @Author xiehao
* @Date 2018/8/11 18:15
* @Param
* @Description 传入的超时时间为以秒为单位
*/
public void putTaskInSeconds(Runnable task,long timeoutPeriod){
long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.SECONDS);
DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
delayQueue.put(delayCancelOrderTask);
}
/**
* @Author xiehao
* @Date 2018/8/11 18:15
* @Param
* @Description 传入的超时时间以为分钟为单位
*/
public void putTaskInMinites(Runnable task,long timeoutPeriod){
long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.MINUTES);
DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
delayQueue.put(delayCancelOrderTask);
}
/**
* @Author xiehao
* @Date 2018/8/11 18:15
* @Param
* @Description 传入的超时时间以小时为单位
*/
public void putTaskInHours(Runnable task,long timeoutPeriod){
long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.HOURS);
DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
delayQueue.put(delayCancelOrderTask);
}
/**
* @Author xiehao
* @Date 2018/8/11 18:15
* @Param
* @Description 传入的超时时间为自定义的单位
*/
public void putTaskInOwnDefine(Runnable task,long timeoutPeriod,TimeUnit unit){
long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,unit);
DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
delayQueue.put(delayCancelOrderTask);
}
/**
* @Author xiehao
* @Date 2018/8/11 18:15
* @Param
* @Description 传入的时间为超时时间点
*/
public void putTaskInTimeoutTime(Runnable task,LocalDateTime timeoutTime){
Duration duration = Duration.between(LocalDateTime.now(),timeoutTime);
long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(),TimeUnit.NANOSECONDS);
DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
delayQueue.put(delayCancelOrderTask);
}
/**
* @Author xiehao
* @Date 2018/8/13 15:43
* @Param
* @Description 从队列中删除某个task。一般在用户自己取消订单的时候执行
*/
public boolean removeTask(DelayCancelOrderTask<? extends Runnable> task){
return delayQueue.remove(task);
}
/**
* @Author xiehao
* @Date 2018/8/13 15:44
* @Param
* @Description 判断队列中是否含有某个task
*/
public boolean contains(DelayCancelOrderTask<? extends Runnable> task){
return delayQueue.contains(task);
}
//获取队列个数。这个方法专门给打印日志核对数据用的。一般用不着它
public Integer getDelayQueueSize(){
System.out.println("队列中的个数:"+delayQueue.size());
return delayQueue.size();
}
}
在系统初始化的时候,需要将数据库中已经处于等待超时状态的订单打入延迟队列:
package com.xxxx.xxxx.config.runner;
import com.xxxx.xxxx.config.taskThreadPool.DelayCancelOrderTaskManager;
import com.xxxx.xxxx.config.taskThreadPool.PayTimeoutCancelOrderProcessor;
import com.xxxx.xxxx.domain.pojo.MonitorOrder;
import com.xxxx.xxxx.service.MonitorOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author: xiehao
* @Date: 2018-08-11
* @Description 系统初始化时将处于待支付状态的单据丢到延时队列中,等待执行超时任务
*/
@Component
public class OrderPayTimeoutQueueRunner implements CommandLineRunner {
@Autowired
private MonitorOrderService monitorOrderService;
@Override
public void run(String... strings) throws Exception {
DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
List<MonitorOrder> monitorOrderList = monitorOrderService.getPayNotTimeoutOrderList();
for(MonitorOrder monitorOrder : monitorOrderList){
PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(monitorOrder.getB2bTaskNo());
delayCancelOrderTaskManager.putTaskInTimeoutTime(processor,monitorOrder.getOrderPayTimeout());
}
}
}
如果用户取消订单/中间执行支付操作后,则从延迟队列中去掉任务:
//取消预约单,则删除延时队列中的超时等待信号 xiehao start
MonitorOrder targetOrder = monitorOrderMapper.getMonitorOrderByTaskNo(taskNum);
DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(targetOrder.getB2bTaskNo());
Duration duration = Duration.between(LocalDateTime.now(), targetOrder.getOrderPayTimeout());
long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(), TimeUnit.NANOSECONDS);
DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout, processor);
delayCancelOrderTaskManager.removeTask(delayCancelOrderTask);
//取消预约单,则删除延时队列中的超时等待信号 xiehao end
如果用户中间下单,则加入延迟队列:
//加入延时队列,等待超时取消 xiehao
DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(order.getB2bTaskNo());
delayCancelOrderTaskManager.putTaskInTimeoutTime(processor, order.getOrderPayTimeout());
工具类:
package com.xxxx.xxxx.utils;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
/**
* @Author: xiehao
* @Date: 2018-08-04
*/
@Component
public class BeanFactoryGetter implements BeanFactoryAware {
//Bean工厂必须是static类型,否则系统启动的时候将无法写入factory
private static BeanFactory factory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
factory = beanFactory;
}
public static Object getBean(String beanName){
if(StringUtils.isEmpty(beanName)){
return null;
}
Object t= factory.getBean(beanName);
return t;
}
}