摘要:线程池任务队列构造方法,实例化时启动线程设置任务队列,用于任务重新入队任务入队从延迟队列中获取任务利用线程池执行任务实现了接口,执行实际的业务并决定任务是否重新进入延迟队列。
前言
接入微信支付的时候,看到微信支付的回调是按照某种频率去回调的,
像15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h这样,其中有一次成功就不会再回调。
于是在想怎么用Java做这个事情。
有定时任务这类功能的框架像Spring和Quartz貌似都没有直接提供以上的功能。
也是出于想练手的目的,决定自己写一写。
// 具体的业务 BaseJob task = new BaseJob() { // 任务执行的次数(模拟真实业务上的退出) int runTime = 1; @Override public void run() { // 业务逻辑 System.out.println("hello world"); // 这里模拟了微信回调成功,任务完成 if (runTime++ > 3) { this.setExit(true); } } };
/** * 测试按照指定时间隔执行某个任务 * @throws IOException */ @Test public void test1() throws IOException { // 新建一个产生指定时间的延迟时间生成器,内部就是个队列 DesignatDTGenerator designatDTGenerator = new DesignatDTGenerator(); // 设置时间间隔 designatDTGenerator.addDelayTime(1_000) // 1秒后执行 .addDelayTime(4_000) // 距离上次执行4秒后执行 .addDelayTime(15_000) // 距离上次执行15秒后执行 .addDelayTime(180_000) // 距离上次执行3分钟后执行 .addDelayTime(180_000) // 距离上次执行3分钟后执行 .addDelayTime(360_000) // 距离上次执行6分钟后执行 .addDelayTime(3_600_000); // 距离上次执行1小时后执行 // 构造一个提交的任务,传入具体的业务对象task,传入延迟时间生成器designatDTGenerator DelayTimeJob delayTimeJob = new DelayTimeJob(task, designatDTGenerator); // 新建一个执行器,执行器可以重复使用,每次提交新的任务即可 JobActuator actuator = new JobActuator(); // 提交任务,开始执行任务 actuator.addJob(delayTimeJob); // 阻塞主线程,方便查看运行结果 System.in.read(); }
/** * 测试按照固定时间间隔执行某个任务 * 只是延迟时间生成器不同而已,可以达到不同的调用效果 * @throws IOException */ @Test public void test2() throws IOException { // 新建一个执行器 JobActuator actuator = new JobActuator(); // 新建一个产生固定时间的延迟时间生成器,每3s执行一次 FixedRateDTGenerator fixedRateDTGenerator = new FixedRateDTGenerator(3000); // 新建一个任务 DelayTimeJob delayTimeJob = new DelayTimeJob(task, fixedRateDTGenerator); // 提交任务,开始执行任务 actuator.addJob(delayTimeJob); // 阻塞主线程,方便查看运行结果 System.in.read(); }类图 各个类的作用
项目地址
JobActuator
任务执行器,本身继承了Thread,职责是在run方法中不断从延迟任务队列DelayQueue中获取延迟到期的任务,
再交由线程池ExecutorService执行。延迟效果的都是依靠DelayQueue实现。
public class JobActuator extends Thread { /** 线程池 */ ExecutorService es = Executors.newFixedThreadPool(2); /** 任务队列 */ DelayQueuejobs = new DelayQueue<>(); /** 构造方法,实例化时启动线程 */ public JobActuator() { this.start(); } public void addJob(DelayTimeJob job) { // 设置任务队列,用于任务重新入队 job.setJobs(jobs); // 任务入队 jobs.offer(job); } @Override public void run() { while (true) { try { // 从延迟队列中获取任务 DelayTimeJob job = jobs.take(); // 利用线程池执行任务 es.submit(job); } catch (InterruptedException e) { e.printStackTrace(); } } } }
DelayTimeJob
实现了Delayed接口,执行实际的业务并决定任务是否重新进入延迟队列。
public class DelayTimeJob implements Runnable, Delayed { /** 执行器的任务队列,用于任务重新入队 */ @Setter private DelayQueuejobs; /** 延迟时间生成器 */ IDelayTimeGenerator delayTimeGenerator; /** 具体要执行的任务 */ private BaseJob realJob; private long time = 0L; public DelayTimeJob(BaseJob baseJob, IDelayTimeGenerator delayTimeGenerator) { this.realJob = baseJob; this.delayTimeGenerator = delayTimeGenerator; Integer delayTime = delayTimeGenerator.getDelayTime(); if (delayTime == null) { return ; } this.time = delayTime + System.currentTimeMillis(); } @Override public void run() { // 执行业务 realJob.run(); // 任务不再需要执行,主动退出 if (realJob.isExit) { return ; } // 获取延迟 Integer delayTime = delayTimeGenerator.getDelayTime(); // 无延迟时间,则任务不再执行 if (delayTime == null) { return ; } // 重新入队 time += delayTime; jobs.offer(this); return ; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { DelayTimeJob other = (DelayTimeJob) o; long diff = time - other.time; if (diff > 0) { return 1; } if (diff < 0) { return -1; } return 0; } }
BaseJob
用户继承此抽象类,在run方法中编写业务代码,通过控制isExit变量控制任务是否执行。
public abstract class BaseJob implements Runnable { /** 用于控制任务是否退出 */ @Setter boolean isExit = false; }
IDelayTimeGenerator
延迟时间生成器接口,返回一个延迟时间。可以实现不同的策略,达到不同的延迟效果。
如DesignatDTGenerator是定义每一次执行的时间间隔,FixedRateDTGenerator是按照某一个固定频率执行。
public interface IDelayTimeGenerator { /** 返回延迟的时间,单位:毫秒 */ Integer getDelayTime(); }
/** * 指定时间的时间生成器 * @author cck */ public class DesignatDTGenerator implements IDelayTimeGenerator { private final DequedelayTimeQueue = new ArrayDeque<>(); /** * 添加延迟时间 * @param delayTime */ public DesignatDTGenerator addDelayTime(Integer delayTime) { delayTimeQueue.offer(delayTime); return this; } @Override public Integer getDelayTime() { return delayTimeQueue.poll(); } }
/** * 固定间隔的时间生成器 * @author cck */ public class FixedRateDTGenerator implements IDelayTimeGenerator { private Integer delayTime; public FixedRateDTGenerator(Integer delayTime) { this.delayTime = delayTime; } @Override public Integer getDelayTime() { return delayTime; } }关键类DelayQueue和Delayed
DelayQueue是Java提供的延迟队列,该队列只允许实现了Delayed接口的对象入队。
调用队列的take方法时,队列会阻塞,直到有延迟到期的元素才会返回。
这个方式是可以实现一开始想要的按照15s/15s/30s/3m/10m/..指定的间隔执行任务的效果的。
定制延迟的效果只需要给出不同的IDelayTimeGenerator接口实现即可。
在和spring一起使用时,任务执行器JobActuator应该是单例的,
不过提交任务的整个操作相比于spring的一个注解,还是显得麻烦囧,使用时再封装一层会更好。
现在的实现方式是和Java的延迟队列绑定了的,但是延迟队列有多种实现方式,
例如redis,rabbitMQ等,如果能够做出更高级的抽象,合入不同的延迟队列那会更好。
此外这种实现方式性能方面也有待验证。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/75399.html
摘要:报文类型对于框架来说,报文可能有多种类型心跳类型报文认证类型报文请求类型报文响应类型报文等。接口调用请求的发送,在多条连接之间进行负载均衡。 1 需求分析 RPC 全称 Remote Procedure Call ,简单地来说,它能让使用者像调用本地方法一样,调用远程的接口,而不需要关注底层的具体细节。 例如车辆违章代办功能,如果车辆因为某种原因违章,只需要通过这个违章代办功能(它也许...
摘要:前言用组件实现简易的定时任务功能。步骤创建一个启动类注意,是关键,加了这个注解才能启动定时任务。编写定时任务方法可以实现两种定时,一种是每个一段时间执行一次方法,另一种是执行一次方法之后间隔若干时间后再执行下一次。 前言 用Spring-Context组件实现简易的定时任务功能。只可以支持较简单的业务场景,实用价值不高。如果想要投放到生产环境,需要进行一些改造。 步骤 1. pom.x...
阅读 617·2023-04-25 18:37
阅读 2779·2021-10-12 10:12
阅读 8312·2021-09-22 15:07
阅读 563·2019-08-30 15:55
阅读 3173·2019-08-30 15:44
阅读 2194·2019-08-30 15:44
阅读 1624·2019-08-30 13:03
阅读 1560·2019-08-30 12:55