diff --git "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" index 1a66ffb..e6efd28 100644 --- "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" +++ "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" @@ -2086,7 +2086,7 @@ - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; - 21、调度过期策略:调度中心错误调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 22、触发策略强化:除了常规Cron、API、父子任务触发方式外,新增提供 "固定间隔触发、固定延时触发" 两种新触发方式; -- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、后处理(posthandle); +- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、结束(complete); - 24、执行器鉴权校验:执行器启动时主动校验accessToken,为空则主动Warn告警;(已规划安全强化:AccessToken动态生成、动态启停等) ### 7.32 版本 v2.3.0 Release Notes[规划中] diff --git "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" index 1a66ffb..e6efd28 100644 --- "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" +++ "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" @@ -2086,7 +2086,7 @@ - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; - 21、调度过期策略:调度中心错误调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 22、触发策略强化:除了常规Cron、API、父子任务触发方式外,新增提供 "固定间隔触发、固定延时触发" 两种新触发方式; -- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、后处理(posthandle); +- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、结束(complete); - 24、执行器鉴权校验:执行器启动时主动校验accessToken,为空则主动Warn告警;(已规划安全强化:AccessToken动态生成、动态启停等) ### 7.32 版本 v2.3.0 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index b570ffe..dcf5bb8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.core.exception.XxlJobException; -import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; +import com.xxl.job.admin.core.complete.XxlJobCompleter; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; @@ -184,7 +184,7 @@ log.setHandleCode(ReturnT.FAIL_CODE); log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():"")); log.setHandleTime(new Date()); - XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); + XxlJobCompleter.updateHandleInfoAndFinish(log); return new ReturnT(runResult.getMsg()); } else { return new ReturnT(500, runResult.getMsg()); diff --git "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" index 1a66ffb..e6efd28 100644 --- "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" +++ "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" @@ -2086,7 +2086,7 @@ - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; - 21、调度过期策略:调度中心错误调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 22、触发策略强化:除了常规Cron、API、父子任务触发方式外,新增提供 "固定间隔触发、固定延时触发" 两种新触发方式; -- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、后处理(posthandle); +- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、结束(complete); - 24、执行器鉴权校验:执行器启动时主动校验accessToken,为空则主动Warn告警;(已规划安全强化:AccessToken动态生成、动态启停等) ### 7.32 版本 v2.3.0 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index b570ffe..dcf5bb8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.core.exception.XxlJobException; -import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; +import com.xxl.job.admin.core.complete.XxlJobCompleter; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; @@ -184,7 +184,7 @@ log.setHandleCode(ReturnT.FAIL_CODE); log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():"")); log.setHandleTime(new Date()); - XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); + XxlJobCompleter.updateHandleInfoAndFinish(log); return new ReturnT(runResult.getMsg()); } else { return new ReturnT(500, runResult.getMsg()); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java new file mode 100644 index 0000000..8cfb757 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java @@ -0,0 +1,99 @@ +package com.xxl.job.admin.core.complete; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; +import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; + +/** + * @author xuxueli 2020-10-30 20:43:10 + */ +public class XxlJobCompleter { + private static Logger logger = LoggerFactory.getLogger(XxlJobCompleter.class); + + /** + * common fresh handle entrance (limit only once) + * + * @param xxlJobLog + * @return + */ + public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { + + // finish + finishJob(xxlJobLog); + + // text最大64kb 避免长度过长 + if (xxlJobLog.getHandleMsg().length() > 15000) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); + } + + // fresh handle + return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); + } + + + /** + * do somethind to finish job + */ + private static void finishJob(XxlJobLog xxlJobLog){ + + // 1、handle success, to trigger child job + String triggerChildMsg = null; + if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { + XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); + if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { + triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; + + String[] childJobIds = xxlJobInfo.getChildJobId().split(","); + for (int i = 0; i < childJobIds.length; i++) { + int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; + if (childJobId > 0) { + + JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); + ReturnT triggerChildResult = ReturnT.SUCCESS; + + // add msg + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), + (i+1), + childJobIds.length, + childJobIds[i], + (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), + triggerChildResult.getMsg()); + } else { + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), + (i+1), + childJobIds.length, + childJobIds[i]); + } + } + + } + } + + if (triggerChildMsg != null) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); + } + + // 2、fix_delay trigger next + // on the way + + } + + private static boolean isNumeric(String str){ + try { + int result = Integer.valueOf(str); + return true; + } catch (NumberFormatException e) { + return false; + } + } + +} diff --git "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" index 1a66ffb..e6efd28 100644 --- "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" +++ "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" @@ -2086,7 +2086,7 @@ - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; - 21、调度过期策略:调度中心错误调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 22、触发策略强化:除了常规Cron、API、父子任务触发方式外,新增提供 "固定间隔触发、固定延时触发" 两种新触发方式; -- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、后处理(posthandle); +- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、结束(complete); - 24、执行器鉴权校验:执行器启动时主动校验accessToken,为空则主动Warn告警;(已规划安全强化:AccessToken动态生成、动态启停等) ### 7.32 版本 v2.3.0 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index b570ffe..dcf5bb8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.core.exception.XxlJobException; -import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; +import com.xxl.job.admin.core.complete.XxlJobCompleter; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; @@ -184,7 +184,7 @@ log.setHandleCode(ReturnT.FAIL_CODE); log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():"")); log.setHandleTime(new Date()); - XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); + XxlJobCompleter.updateHandleInfoAndFinish(log); return new ReturnT(runResult.getMsg()); } else { return new ReturnT(500, runResult.getMsg()); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java new file mode 100644 index 0000000..8cfb757 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java @@ -0,0 +1,99 @@ +package com.xxl.job.admin.core.complete; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; +import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; + +/** + * @author xuxueli 2020-10-30 20:43:10 + */ +public class XxlJobCompleter { + private static Logger logger = LoggerFactory.getLogger(XxlJobCompleter.class); + + /** + * common fresh handle entrance (limit only once) + * + * @param xxlJobLog + * @return + */ + public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { + + // finish + finishJob(xxlJobLog); + + // text最大64kb 避免长度过长 + if (xxlJobLog.getHandleMsg().length() > 15000) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); + } + + // fresh handle + return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); + } + + + /** + * do somethind to finish job + */ + private static void finishJob(XxlJobLog xxlJobLog){ + + // 1、handle success, to trigger child job + String triggerChildMsg = null; + if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { + XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); + if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { + triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; + + String[] childJobIds = xxlJobInfo.getChildJobId().split(","); + for (int i = 0; i < childJobIds.length; i++) { + int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; + if (childJobId > 0) { + + JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); + ReturnT triggerChildResult = ReturnT.SUCCESS; + + // add msg + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), + (i+1), + childJobIds.length, + childJobIds[i], + (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), + triggerChildResult.getMsg()); + } else { + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), + (i+1), + childJobIds.length, + childJobIds[i]); + } + } + + } + } + + if (triggerChildMsg != null) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); + } + + // 2、fix_delay trigger next + // on the way + + } + + private static boolean isNumeric(String str){ + try { + int result = Integer.valueOf(str); + return true; + } catch (NumberFormatException e) { + return false; + } + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java deleted file mode 100644 index 266df11..0000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.xxl.job.admin.core.handle; - -import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; -import com.xxl.job.admin.core.trigger.TriggerTypeEnum; -import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.MessageFormat; - -/** - * @author xuxueli 2020-10-30 20:43:10 - */ -public class XxlJobPostHandleHelper { - private static Logger logger = LoggerFactory.getLogger(XxlJobPostHandleHelper.class); - - /** - * common fresh handle entrance (limit only once) - * - * @param xxlJobLog - * @return - */ - public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { - - // finish - finishJob(xxlJobLog); - - // text最大64kb 避免长度过长 - if (xxlJobLog.getHandleMsg().length() > 15000) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); - } - - // fresh handle - return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); - } - - - /** - * do somethind to finish job - */ - private static void finishJob(XxlJobLog xxlJobLog){ - - // 1、handle success, to trigger child job - String triggerChildMsg = null; - if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { - XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); - if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { - triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; - - String[] childJobIds = xxlJobInfo.getChildJobId().split(","); - for (int i = 0; i < childJobIds.length; i++) { - int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; - if (childJobId > 0) { - - JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); - ReturnT triggerChildResult = ReturnT.SUCCESS; - - // add msg - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), - (i+1), - childJobIds.length, - childJobIds[i], - (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), - triggerChildResult.getMsg()); - } else { - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), - (i+1), - childJobIds.length, - childJobIds[i]); - } - } - - } - } - - if (triggerChildMsg != null) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); - } - - // 2、fix_delay trigger next - // on the way - - } - - private static boolean isNumeric(String str){ - try { - int result = Integer.valueOf(str); - return true; - } catch (NumberFormatException e) { - return false; - } - } - -} diff --git "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" index 1a66ffb..e6efd28 100644 --- "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" +++ "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" @@ -2086,7 +2086,7 @@ - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; - 21、调度过期策略:调度中心错误调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 22、触发策略强化:除了常规Cron、API、父子任务触发方式外,新增提供 "固定间隔触发、固定延时触发" 两种新触发方式; -- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、后处理(posthandle); +- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、结束(complete); - 24、执行器鉴权校验:执行器启动时主动校验accessToken,为空则主动Warn告警;(已规划安全强化:AccessToken动态生成、动态启停等) ### 7.32 版本 v2.3.0 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index b570ffe..dcf5bb8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.core.exception.XxlJobException; -import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; +import com.xxl.job.admin.core.complete.XxlJobCompleter; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; @@ -184,7 +184,7 @@ log.setHandleCode(ReturnT.FAIL_CODE); log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():"")); log.setHandleTime(new Date()); - XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); + XxlJobCompleter.updateHandleInfoAndFinish(log); return new ReturnT(runResult.getMsg()); } else { return new ReturnT(500, runResult.getMsg()); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java new file mode 100644 index 0000000..8cfb757 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java @@ -0,0 +1,99 @@ +package com.xxl.job.admin.core.complete; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; +import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; + +/** + * @author xuxueli 2020-10-30 20:43:10 + */ +public class XxlJobCompleter { + private static Logger logger = LoggerFactory.getLogger(XxlJobCompleter.class); + + /** + * common fresh handle entrance (limit only once) + * + * @param xxlJobLog + * @return + */ + public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { + + // finish + finishJob(xxlJobLog); + + // text最大64kb 避免长度过长 + if (xxlJobLog.getHandleMsg().length() > 15000) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); + } + + // fresh handle + return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); + } + + + /** + * do somethind to finish job + */ + private static void finishJob(XxlJobLog xxlJobLog){ + + // 1、handle success, to trigger child job + String triggerChildMsg = null; + if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { + XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); + if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { + triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; + + String[] childJobIds = xxlJobInfo.getChildJobId().split(","); + for (int i = 0; i < childJobIds.length; i++) { + int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; + if (childJobId > 0) { + + JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); + ReturnT triggerChildResult = ReturnT.SUCCESS; + + // add msg + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), + (i+1), + childJobIds.length, + childJobIds[i], + (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), + triggerChildResult.getMsg()); + } else { + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), + (i+1), + childJobIds.length, + childJobIds[i]); + } + } + + } + } + + if (triggerChildMsg != null) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); + } + + // 2、fix_delay trigger next + // on the way + + } + + private static boolean isNumeric(String str){ + try { + int result = Integer.valueOf(str); + return true; + } catch (NumberFormatException e) { + return false; + } + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java deleted file mode 100644 index 266df11..0000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.xxl.job.admin.core.handle; - -import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; -import com.xxl.job.admin.core.trigger.TriggerTypeEnum; -import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.MessageFormat; - -/** - * @author xuxueli 2020-10-30 20:43:10 - */ -public class XxlJobPostHandleHelper { - private static Logger logger = LoggerFactory.getLogger(XxlJobPostHandleHelper.class); - - /** - * common fresh handle entrance (limit only once) - * - * @param xxlJobLog - * @return - */ - public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { - - // finish - finishJob(xxlJobLog); - - // text最大64kb 避免长度过长 - if (xxlJobLog.getHandleMsg().length() > 15000) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); - } - - // fresh handle - return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); - } - - - /** - * do somethind to finish job - */ - private static void finishJob(XxlJobLog xxlJobLog){ - - // 1、handle success, to trigger child job - String triggerChildMsg = null; - if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { - XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); - if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { - triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; - - String[] childJobIds = xxlJobInfo.getChildJobId().split(","); - for (int i = 0; i < childJobIds.length; i++) { - int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; - if (childJobId > 0) { - - JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); - ReturnT triggerChildResult = ReturnT.SUCCESS; - - // add msg - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), - (i+1), - childJobIds.length, - childJobIds[i], - (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), - triggerChildResult.getMsg()); - } else { - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), - (i+1), - childJobIds.length, - childJobIds[i]); - } - } - - } - } - - if (triggerChildMsg != null) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); - } - - // 2、fix_delay trigger next - // on the way - - } - - private static boolean isNumeric(String str){ - try { - int result = Integer.valueOf(str); - return true; - } catch (NumberFormatException e) { - return false; - } - } - -} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index 555105b..bb2cda8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -34,7 +34,7 @@ JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) - JobLogHelper.getInstance().start(); + JobCompleteHelper.getInstance().start(); // admin log report start JobLogReportHelper.getInstance().start(); @@ -55,7 +55,7 @@ JobLogReportHelper.getInstance().toStop(); // admin lose-monitor stop - JobLogHelper.getInstance().toStop(); + JobCompleteHelper.getInstance().toStop(); // admin fail-monitor stop JobFailMonitorHelper.getInstance().toStop(); diff --git "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" index 1a66ffb..e6efd28 100644 --- "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" +++ "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" @@ -2086,7 +2086,7 @@ - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; - 21、调度过期策略:调度中心错误调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 22、触发策略强化:除了常规Cron、API、父子任务触发方式外,新增提供 "固定间隔触发、固定延时触发" 两种新触发方式; -- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、后处理(posthandle); +- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、结束(complete); - 24、执行器鉴权校验:执行器启动时主动校验accessToken,为空则主动Warn告警;(已规划安全强化:AccessToken动态生成、动态启停等) ### 7.32 版本 v2.3.0 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index b570ffe..dcf5bb8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.core.exception.XxlJobException; -import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; +import com.xxl.job.admin.core.complete.XxlJobCompleter; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; @@ -184,7 +184,7 @@ log.setHandleCode(ReturnT.FAIL_CODE); log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():"")); log.setHandleTime(new Date()); - XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); + XxlJobCompleter.updateHandleInfoAndFinish(log); return new ReturnT(runResult.getMsg()); } else { return new ReturnT(500, runResult.getMsg()); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java new file mode 100644 index 0000000..8cfb757 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java @@ -0,0 +1,99 @@ +package com.xxl.job.admin.core.complete; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; +import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; + +/** + * @author xuxueli 2020-10-30 20:43:10 + */ +public class XxlJobCompleter { + private static Logger logger = LoggerFactory.getLogger(XxlJobCompleter.class); + + /** + * common fresh handle entrance (limit only once) + * + * @param xxlJobLog + * @return + */ + public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { + + // finish + finishJob(xxlJobLog); + + // text最大64kb 避免长度过长 + if (xxlJobLog.getHandleMsg().length() > 15000) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); + } + + // fresh handle + return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); + } + + + /** + * do somethind to finish job + */ + private static void finishJob(XxlJobLog xxlJobLog){ + + // 1、handle success, to trigger child job + String triggerChildMsg = null; + if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { + XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); + if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { + triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; + + String[] childJobIds = xxlJobInfo.getChildJobId().split(","); + for (int i = 0; i < childJobIds.length; i++) { + int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; + if (childJobId > 0) { + + JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); + ReturnT triggerChildResult = ReturnT.SUCCESS; + + // add msg + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), + (i+1), + childJobIds.length, + childJobIds[i], + (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), + triggerChildResult.getMsg()); + } else { + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), + (i+1), + childJobIds.length, + childJobIds[i]); + } + } + + } + } + + if (triggerChildMsg != null) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); + } + + // 2、fix_delay trigger next + // on the way + + } + + private static boolean isNumeric(String str){ + try { + int result = Integer.valueOf(str); + return true; + } catch (NumberFormatException e) { + return false; + } + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java deleted file mode 100644 index 266df11..0000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.xxl.job.admin.core.handle; - -import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; -import com.xxl.job.admin.core.trigger.TriggerTypeEnum; -import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.MessageFormat; - -/** - * @author xuxueli 2020-10-30 20:43:10 - */ -public class XxlJobPostHandleHelper { - private static Logger logger = LoggerFactory.getLogger(XxlJobPostHandleHelper.class); - - /** - * common fresh handle entrance (limit only once) - * - * @param xxlJobLog - * @return - */ - public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { - - // finish - finishJob(xxlJobLog); - - // text最大64kb 避免长度过长 - if (xxlJobLog.getHandleMsg().length() > 15000) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); - } - - // fresh handle - return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); - } - - - /** - * do somethind to finish job - */ - private static void finishJob(XxlJobLog xxlJobLog){ - - // 1、handle success, to trigger child job - String triggerChildMsg = null; - if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { - XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); - if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { - triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; - - String[] childJobIds = xxlJobInfo.getChildJobId().split(","); - for (int i = 0; i < childJobIds.length; i++) { - int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; - if (childJobId > 0) { - - JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); - ReturnT triggerChildResult = ReturnT.SUCCESS; - - // add msg - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), - (i+1), - childJobIds.length, - childJobIds[i], - (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), - triggerChildResult.getMsg()); - } else { - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), - (i+1), - childJobIds.length, - childJobIds[i]); - } - } - - } - } - - if (triggerChildMsg != null) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); - } - - // 2、fix_delay trigger next - // on the way - - } - - private static boolean isNumeric(String str){ - try { - int result = Integer.valueOf(str); - return true; - } catch (NumberFormatException e) { - return false; - } - } - -} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index 555105b..bb2cda8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -34,7 +34,7 @@ JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) - JobLogHelper.getInstance().start(); + JobCompleteHelper.getInstance().start(); // admin log report start JobLogReportHelper.getInstance().start(); @@ -55,7 +55,7 @@ JobLogReportHelper.getInstance().toStop(); // admin lose-monitor stop - JobLogHelper.getInstance().toStop(); + JobCompleteHelper.getInstance().toStop(); // admin fail-monitor stop JobFailMonitorHelper.getInstance().toStop(); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java new file mode 100644 index 0000000..f24b1d5 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java @@ -0,0 +1,185 @@ +package com.xxl.job.admin.core.thread; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.complete.XxlJobCompleter; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.util.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; + +/** + * job lose-monitor instance + * + * @author xuxueli 2015-9-1 18:05:56 + */ +public class JobCompleteHelper { + private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class); + + private static JobCompleteHelper instance = new JobCompleteHelper(); + public static JobCompleteHelper getInstance(){ + return instance; + } + + // ---------------------- monitor ---------------------- + + private ThreadPoolExecutor callbackThreadPool = null; + private Thread monitorThread; + private volatile boolean toStop = false; + public void start(){ + + // for callback + callbackThreadPool = new ThreadPoolExecutor( + 2, + 20, + 30L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(3000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); + } + }, + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + r.run(); + logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); + } + }); + + + // for monitor + monitorThread = new Thread(new Runnable() { + + @Override + public void run() { + + // wait for JobTriggerPoolHelper-init + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + + // monitor + while (!toStop) { + try { + // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; + Date losedTime = DateUtil.addMinutes(new Date(), -10); + List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); + + if (losedJobIds!=null && losedJobIds.size()>0) { + for (Long logId: losedJobIds) { + + XxlJobLog jobLog = new XxlJobLog(); + jobLog.setId(logId); + + jobLog.setHandleTime(new Date()); + jobLog.setHandleCode(ReturnT.FAIL_CODE); + jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); + + XxlJobCompleter.updateHandleInfoAndFinish(jobLog); + } + + } + } catch (Exception e) { + if (!toStop) { + logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); + } + } + + try { + TimeUnit.SECONDS.sleep(60); + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + + } + + logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); + + } + }); + monitorThread.setDaemon(true); + monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); + monitorThread.start(); + } + + public void toStop(){ + toStop = true; + + // stop registryOrRemoveThreadPool + callbackThreadPool.shutdownNow(); + + // stop monitorThread (interrupt and wait) + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + + // ---------------------- helper ---------------------- + + public ReturnT callback(List callbackParamList) { + + callbackThreadPool.execute(new Runnable() { + @Override + public void run() { + for (HandleCallbackParam handleCallbackParam: callbackParamList) { + ReturnT callbackResult = callback(handleCallbackParam); + logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", + (callbackResult.getCode()== IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult); + } + } + }); + + return ReturnT.SUCCESS; + } + + private ReturnT callback(HandleCallbackParam handleCallbackParam) { + // valid log item + XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); + if (log == null) { + return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); + } + if (log.getHandleCode() > 0) { + return new ReturnT(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc + } + + // handle msg + StringBuffer handleMsg = new StringBuffer(); + if (log.getHandleMsg()!=null) { + handleMsg.append(log.getHandleMsg()).append("
"); + } + if (handleCallbackParam.getExecuteResult().getMsg() != null) { + handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); + } + + // success, save log + log.setHandleTime(new Date()); + log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); + log.setHandleMsg(handleMsg.toString()); + XxlJobCompleter.updateHandleInfoAndFinish(log); + + return ReturnT.SUCCESS; + } + + + +} diff --git "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" index 1a66ffb..e6efd28 100644 --- "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" +++ "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" @@ -2086,7 +2086,7 @@ - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; - 21、调度过期策略:调度中心错误调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 22、触发策略强化:除了常规Cron、API、父子任务触发方式外,新增提供 "固定间隔触发、固定延时触发" 两种新触发方式; -- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、后处理(posthandle); +- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、结束(complete); - 24、执行器鉴权校验:执行器启动时主动校验accessToken,为空则主动Warn告警;(已规划安全强化:AccessToken动态生成、动态启停等) ### 7.32 版本 v2.3.0 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index b570ffe..dcf5bb8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.core.exception.XxlJobException; -import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; +import com.xxl.job.admin.core.complete.XxlJobCompleter; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; @@ -184,7 +184,7 @@ log.setHandleCode(ReturnT.FAIL_CODE); log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():"")); log.setHandleTime(new Date()); - XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); + XxlJobCompleter.updateHandleInfoAndFinish(log); return new ReturnT(runResult.getMsg()); } else { return new ReturnT(500, runResult.getMsg()); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java new file mode 100644 index 0000000..8cfb757 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java @@ -0,0 +1,99 @@ +package com.xxl.job.admin.core.complete; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; +import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; + +/** + * @author xuxueli 2020-10-30 20:43:10 + */ +public class XxlJobCompleter { + private static Logger logger = LoggerFactory.getLogger(XxlJobCompleter.class); + + /** + * common fresh handle entrance (limit only once) + * + * @param xxlJobLog + * @return + */ + public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { + + // finish + finishJob(xxlJobLog); + + // text最大64kb 避免长度过长 + if (xxlJobLog.getHandleMsg().length() > 15000) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); + } + + // fresh handle + return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); + } + + + /** + * do somethind to finish job + */ + private static void finishJob(XxlJobLog xxlJobLog){ + + // 1、handle success, to trigger child job + String triggerChildMsg = null; + if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { + XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); + if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { + triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; + + String[] childJobIds = xxlJobInfo.getChildJobId().split(","); + for (int i = 0; i < childJobIds.length; i++) { + int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; + if (childJobId > 0) { + + JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); + ReturnT triggerChildResult = ReturnT.SUCCESS; + + // add msg + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), + (i+1), + childJobIds.length, + childJobIds[i], + (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), + triggerChildResult.getMsg()); + } else { + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), + (i+1), + childJobIds.length, + childJobIds[i]); + } + } + + } + } + + if (triggerChildMsg != null) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); + } + + // 2、fix_delay trigger next + // on the way + + } + + private static boolean isNumeric(String str){ + try { + int result = Integer.valueOf(str); + return true; + } catch (NumberFormatException e) { + return false; + } + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java deleted file mode 100644 index 266df11..0000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.xxl.job.admin.core.handle; - -import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; -import com.xxl.job.admin.core.trigger.TriggerTypeEnum; -import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.MessageFormat; - -/** - * @author xuxueli 2020-10-30 20:43:10 - */ -public class XxlJobPostHandleHelper { - private static Logger logger = LoggerFactory.getLogger(XxlJobPostHandleHelper.class); - - /** - * common fresh handle entrance (limit only once) - * - * @param xxlJobLog - * @return - */ - public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { - - // finish - finishJob(xxlJobLog); - - // text最大64kb 避免长度过长 - if (xxlJobLog.getHandleMsg().length() > 15000) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); - } - - // fresh handle - return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); - } - - - /** - * do somethind to finish job - */ - private static void finishJob(XxlJobLog xxlJobLog){ - - // 1、handle success, to trigger child job - String triggerChildMsg = null; - if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { - XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); - if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { - triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; - - String[] childJobIds = xxlJobInfo.getChildJobId().split(","); - for (int i = 0; i < childJobIds.length; i++) { - int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; - if (childJobId > 0) { - - JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); - ReturnT triggerChildResult = ReturnT.SUCCESS; - - // add msg - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), - (i+1), - childJobIds.length, - childJobIds[i], - (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), - triggerChildResult.getMsg()); - } else { - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), - (i+1), - childJobIds.length, - childJobIds[i]); - } - } - - } - } - - if (triggerChildMsg != null) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); - } - - // 2、fix_delay trigger next - // on the way - - } - - private static boolean isNumeric(String str){ - try { - int result = Integer.valueOf(str); - return true; - } catch (NumberFormatException e) { - return false; - } - } - -} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index 555105b..bb2cda8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -34,7 +34,7 @@ JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) - JobLogHelper.getInstance().start(); + JobCompleteHelper.getInstance().start(); // admin log report start JobLogReportHelper.getInstance().start(); @@ -55,7 +55,7 @@ JobLogReportHelper.getInstance().toStop(); // admin lose-monitor stop - JobLogHelper.getInstance().toStop(); + JobCompleteHelper.getInstance().toStop(); // admin fail-monitor stop JobFailMonitorHelper.getInstance().toStop(); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java new file mode 100644 index 0000000..f24b1d5 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java @@ -0,0 +1,185 @@ +package com.xxl.job.admin.core.thread; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.complete.XxlJobCompleter; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.util.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; + +/** + * job lose-monitor instance + * + * @author xuxueli 2015-9-1 18:05:56 + */ +public class JobCompleteHelper { + private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class); + + private static JobCompleteHelper instance = new JobCompleteHelper(); + public static JobCompleteHelper getInstance(){ + return instance; + } + + // ---------------------- monitor ---------------------- + + private ThreadPoolExecutor callbackThreadPool = null; + private Thread monitorThread; + private volatile boolean toStop = false; + public void start(){ + + // for callback + callbackThreadPool = new ThreadPoolExecutor( + 2, + 20, + 30L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(3000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); + } + }, + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + r.run(); + logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); + } + }); + + + // for monitor + monitorThread = new Thread(new Runnable() { + + @Override + public void run() { + + // wait for JobTriggerPoolHelper-init + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + + // monitor + while (!toStop) { + try { + // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; + Date losedTime = DateUtil.addMinutes(new Date(), -10); + List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); + + if (losedJobIds!=null && losedJobIds.size()>0) { + for (Long logId: losedJobIds) { + + XxlJobLog jobLog = new XxlJobLog(); + jobLog.setId(logId); + + jobLog.setHandleTime(new Date()); + jobLog.setHandleCode(ReturnT.FAIL_CODE); + jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); + + XxlJobCompleter.updateHandleInfoAndFinish(jobLog); + } + + } + } catch (Exception e) { + if (!toStop) { + logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); + } + } + + try { + TimeUnit.SECONDS.sleep(60); + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + + } + + logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); + + } + }); + monitorThread.setDaemon(true); + monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); + monitorThread.start(); + } + + public void toStop(){ + toStop = true; + + // stop registryOrRemoveThreadPool + callbackThreadPool.shutdownNow(); + + // stop monitorThread (interrupt and wait) + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + + // ---------------------- helper ---------------------- + + public ReturnT callback(List callbackParamList) { + + callbackThreadPool.execute(new Runnable() { + @Override + public void run() { + for (HandleCallbackParam handleCallbackParam: callbackParamList) { + ReturnT callbackResult = callback(handleCallbackParam); + logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", + (callbackResult.getCode()== IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult); + } + } + }); + + return ReturnT.SUCCESS; + } + + private ReturnT callback(HandleCallbackParam handleCallbackParam) { + // valid log item + XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); + if (log == null) { + return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); + } + if (log.getHandleCode() > 0) { + return new ReturnT(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc + } + + // handle msg + StringBuffer handleMsg = new StringBuffer(); + if (log.getHandleMsg()!=null) { + handleMsg.append(log.getHandleMsg()).append("
"); + } + if (handleCallbackParam.getExecuteResult().getMsg() != null) { + handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); + } + + // success, save log + log.setHandleTime(new Date()); + log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); + log.setHandleMsg(handleMsg.toString()); + XxlJobCompleter.updateHandleInfoAndFinish(log); + + return ReturnT.SUCCESS; + } + + + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java deleted file mode 100644 index cf5048f..0000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.xxl.job.admin.core.thread; - -import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.core.biz.model.HandleCallbackParam; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.util.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.List; -import java.util.concurrent.*; - -/** - * job lose-monitor instance - * - * @author xuxueli 2015-9-1 18:05:56 - */ -public class JobLogHelper { - private static Logger logger = LoggerFactory.getLogger(JobLogHelper.class); - - private static JobLogHelper instance = new JobLogHelper(); - public static JobLogHelper getInstance(){ - return instance; - } - - // ---------------------- monitor ---------------------- - - private ThreadPoolExecutor callbackThreadPool = null; - private Thread monitorThread; - private volatile boolean toStop = false; - public void start(){ - - // for callback - callbackThreadPool = new ThreadPoolExecutor( - 2, - 20, - 30L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(3000), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); - } - }, - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - r.run(); - logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); - } - }); - - - // for monitor - monitorThread = new Thread(new Runnable() { - - @Override - public void run() { - - // wait for JobTriggerPoolHelper-init - try { - TimeUnit.MILLISECONDS.sleep(50); - } catch (InterruptedException e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - - // monitor - while (!toStop) { - try { - // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; - Date losedTime = DateUtil.addMinutes(new Date(), -10); - List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); - - if (losedJobIds!=null && losedJobIds.size()>0) { - for (Long logId: losedJobIds) { - - XxlJobLog jobLog = new XxlJobLog(); - jobLog.setId(logId); - - jobLog.setHandleTime(new Date()); - jobLog.setHandleCode(ReturnT.FAIL_CODE); - jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); - - XxlJobPostHandleHelper.updateHandleInfoAndFinish(jobLog); - } - - } - } catch (Exception e) { - if (!toStop) { - logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); - } - } - - try { - TimeUnit.SECONDS.sleep(60); - } catch (Exception e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - - } - - logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); - - } - }); - monitorThread.setDaemon(true); - monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); - monitorThread.start(); - } - - public void toStop(){ - toStop = true; - - // stop registryOrRemoveThreadPool - callbackThreadPool.shutdownNow(); - - // stop monitorThread (interrupt and wait) - monitorThread.interrupt(); - try { - monitorThread.join(); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - - - // ---------------------- helper ---------------------- - - public ReturnT callback(List callbackParamList) { - - callbackThreadPool.execute(new Runnable() { - @Override - public void run() { - for (HandleCallbackParam handleCallbackParam: callbackParamList) { - ReturnT callbackResult = callback(handleCallbackParam); - logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", - (callbackResult.getCode()== IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult); - } - } - }); - - return ReturnT.SUCCESS; - } - - private ReturnT callback(HandleCallbackParam handleCallbackParam) { - // valid log item - XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); - if (log == null) { - return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); - } - if (log.getHandleCode() > 0) { - return new ReturnT(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc - } - - // handle msg - StringBuffer handleMsg = new StringBuffer(); - if (log.getHandleMsg()!=null) { - handleMsg.append(log.getHandleMsg()).append("
"); - } - if (handleCallbackParam.getExecuteResult().getMsg() != null) { - handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); - } - - // success, save log - log.setHandleTime(new Date()); - log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); - log.setHandleMsg(handleMsg.toString()); - XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); - - return ReturnT.SUCCESS; - } - - - -} diff --git "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" index 1a66ffb..e6efd28 100644 --- "a/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" +++ "b/doc/XXL-JOB\345\256\230\346\226\271\346\226\207\346\241\243.md" @@ -2086,7 +2086,7 @@ - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; - 21、调度过期策略:调度中心错误调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 22、触发策略强化:除了常规Cron、API、父子任务触发方式外,新增提供 "固定间隔触发、固定延时触发" 两种新触发方式; -- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、后处理(posthandle); +- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、结束(complete); - 24、执行器鉴权校验:执行器启动时主动校验accessToken,为空则主动Warn告警;(已规划安全强化:AccessToken动态生成、动态启停等) ### 7.32 版本 v2.3.0 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index b570ffe..dcf5bb8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.core.exception.XxlJobException; -import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; +import com.xxl.job.admin.core.complete.XxlJobCompleter; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; @@ -184,7 +184,7 @@ log.setHandleCode(ReturnT.FAIL_CODE); log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():"")); log.setHandleTime(new Date()); - XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); + XxlJobCompleter.updateHandleInfoAndFinish(log); return new ReturnT(runResult.getMsg()); } else { return new ReturnT(500, runResult.getMsg()); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java new file mode 100644 index 0000000..8cfb757 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java @@ -0,0 +1,99 @@ +package com.xxl.job.admin.core.complete; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; +import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; + +/** + * @author xuxueli 2020-10-30 20:43:10 + */ +public class XxlJobCompleter { + private static Logger logger = LoggerFactory.getLogger(XxlJobCompleter.class); + + /** + * common fresh handle entrance (limit only once) + * + * @param xxlJobLog + * @return + */ + public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { + + // finish + finishJob(xxlJobLog); + + // text最大64kb 避免长度过长 + if (xxlJobLog.getHandleMsg().length() > 15000) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); + } + + // fresh handle + return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); + } + + + /** + * do somethind to finish job + */ + private static void finishJob(XxlJobLog xxlJobLog){ + + // 1、handle success, to trigger child job + String triggerChildMsg = null; + if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { + XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); + if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { + triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; + + String[] childJobIds = xxlJobInfo.getChildJobId().split(","); + for (int i = 0; i < childJobIds.length; i++) { + int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; + if (childJobId > 0) { + + JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); + ReturnT triggerChildResult = ReturnT.SUCCESS; + + // add msg + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), + (i+1), + childJobIds.length, + childJobIds[i], + (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), + triggerChildResult.getMsg()); + } else { + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), + (i+1), + childJobIds.length, + childJobIds[i]); + } + } + + } + } + + if (triggerChildMsg != null) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); + } + + // 2、fix_delay trigger next + // on the way + + } + + private static boolean isNumeric(String str){ + try { + int result = Integer.valueOf(str); + return true; + } catch (NumberFormatException e) { + return false; + } + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java deleted file mode 100644 index 266df11..0000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.xxl.job.admin.core.handle; - -import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; -import com.xxl.job.admin.core.trigger.TriggerTypeEnum; -import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.MessageFormat; - -/** - * @author xuxueli 2020-10-30 20:43:10 - */ -public class XxlJobPostHandleHelper { - private static Logger logger = LoggerFactory.getLogger(XxlJobPostHandleHelper.class); - - /** - * common fresh handle entrance (limit only once) - * - * @param xxlJobLog - * @return - */ - public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { - - // finish - finishJob(xxlJobLog); - - // text最大64kb 避免长度过长 - if (xxlJobLog.getHandleMsg().length() > 15000) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); - } - - // fresh handle - return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); - } - - - /** - * do somethind to finish job - */ - private static void finishJob(XxlJobLog xxlJobLog){ - - // 1、handle success, to trigger child job - String triggerChildMsg = null; - if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { - XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); - if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { - triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; - - String[] childJobIds = xxlJobInfo.getChildJobId().split(","); - for (int i = 0; i < childJobIds.length; i++) { - int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; - if (childJobId > 0) { - - JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); - ReturnT triggerChildResult = ReturnT.SUCCESS; - - // add msg - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), - (i+1), - childJobIds.length, - childJobIds[i], - (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), - triggerChildResult.getMsg()); - } else { - triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), - (i+1), - childJobIds.length, - childJobIds[i]); - } - } - - } - } - - if (triggerChildMsg != null) { - xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); - } - - // 2、fix_delay trigger next - // on the way - - } - - private static boolean isNumeric(String str){ - try { - int result = Integer.valueOf(str); - return true; - } catch (NumberFormatException e) { - return false; - } - } - -} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index 555105b..bb2cda8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -34,7 +34,7 @@ JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) - JobLogHelper.getInstance().start(); + JobCompleteHelper.getInstance().start(); // admin log report start JobLogReportHelper.getInstance().start(); @@ -55,7 +55,7 @@ JobLogReportHelper.getInstance().toStop(); // admin lose-monitor stop - JobLogHelper.getInstance().toStop(); + JobCompleteHelper.getInstance().toStop(); // admin fail-monitor stop JobFailMonitorHelper.getInstance().toStop(); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java new file mode 100644 index 0000000..f24b1d5 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java @@ -0,0 +1,185 @@ +package com.xxl.job.admin.core.thread; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.complete.XxlJobCompleter; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.util.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; + +/** + * job lose-monitor instance + * + * @author xuxueli 2015-9-1 18:05:56 + */ +public class JobCompleteHelper { + private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class); + + private static JobCompleteHelper instance = new JobCompleteHelper(); + public static JobCompleteHelper getInstance(){ + return instance; + } + + // ---------------------- monitor ---------------------- + + private ThreadPoolExecutor callbackThreadPool = null; + private Thread monitorThread; + private volatile boolean toStop = false; + public void start(){ + + // for callback + callbackThreadPool = new ThreadPoolExecutor( + 2, + 20, + 30L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(3000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); + } + }, + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + r.run(); + logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); + } + }); + + + // for monitor + monitorThread = new Thread(new Runnable() { + + @Override + public void run() { + + // wait for JobTriggerPoolHelper-init + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + + // monitor + while (!toStop) { + try { + // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; + Date losedTime = DateUtil.addMinutes(new Date(), -10); + List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); + + if (losedJobIds!=null && losedJobIds.size()>0) { + for (Long logId: losedJobIds) { + + XxlJobLog jobLog = new XxlJobLog(); + jobLog.setId(logId); + + jobLog.setHandleTime(new Date()); + jobLog.setHandleCode(ReturnT.FAIL_CODE); + jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); + + XxlJobCompleter.updateHandleInfoAndFinish(jobLog); + } + + } + } catch (Exception e) { + if (!toStop) { + logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); + } + } + + try { + TimeUnit.SECONDS.sleep(60); + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + + } + + logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); + + } + }); + monitorThread.setDaemon(true); + monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); + monitorThread.start(); + } + + public void toStop(){ + toStop = true; + + // stop registryOrRemoveThreadPool + callbackThreadPool.shutdownNow(); + + // stop monitorThread (interrupt and wait) + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + + // ---------------------- helper ---------------------- + + public ReturnT callback(List callbackParamList) { + + callbackThreadPool.execute(new Runnable() { + @Override + public void run() { + for (HandleCallbackParam handleCallbackParam: callbackParamList) { + ReturnT callbackResult = callback(handleCallbackParam); + logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", + (callbackResult.getCode()== IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult); + } + } + }); + + return ReturnT.SUCCESS; + } + + private ReturnT callback(HandleCallbackParam handleCallbackParam) { + // valid log item + XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); + if (log == null) { + return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); + } + if (log.getHandleCode() > 0) { + return new ReturnT(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc + } + + // handle msg + StringBuffer handleMsg = new StringBuffer(); + if (log.getHandleMsg()!=null) { + handleMsg.append(log.getHandleMsg()).append("
"); + } + if (handleCallbackParam.getExecuteResult().getMsg() != null) { + handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); + } + + // success, save log + log.setHandleTime(new Date()); + log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); + log.setHandleMsg(handleMsg.toString()); + XxlJobCompleter.updateHandleInfoAndFinish(log); + + return ReturnT.SUCCESS; + } + + + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java deleted file mode 100644 index cf5048f..0000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.xxl.job.admin.core.thread; - -import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.core.biz.model.HandleCallbackParam; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.util.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.List; -import java.util.concurrent.*; - -/** - * job lose-monitor instance - * - * @author xuxueli 2015-9-1 18:05:56 - */ -public class JobLogHelper { - private static Logger logger = LoggerFactory.getLogger(JobLogHelper.class); - - private static JobLogHelper instance = new JobLogHelper(); - public static JobLogHelper getInstance(){ - return instance; - } - - // ---------------------- monitor ---------------------- - - private ThreadPoolExecutor callbackThreadPool = null; - private Thread monitorThread; - private volatile boolean toStop = false; - public void start(){ - - // for callback - callbackThreadPool = new ThreadPoolExecutor( - 2, - 20, - 30L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(3000), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); - } - }, - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - r.run(); - logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); - } - }); - - - // for monitor - monitorThread = new Thread(new Runnable() { - - @Override - public void run() { - - // wait for JobTriggerPoolHelper-init - try { - TimeUnit.MILLISECONDS.sleep(50); - } catch (InterruptedException e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - - // monitor - while (!toStop) { - try { - // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; - Date losedTime = DateUtil.addMinutes(new Date(), -10); - List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); - - if (losedJobIds!=null && losedJobIds.size()>0) { - for (Long logId: losedJobIds) { - - XxlJobLog jobLog = new XxlJobLog(); - jobLog.setId(logId); - - jobLog.setHandleTime(new Date()); - jobLog.setHandleCode(ReturnT.FAIL_CODE); - jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); - - XxlJobPostHandleHelper.updateHandleInfoAndFinish(jobLog); - } - - } - } catch (Exception e) { - if (!toStop) { - logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); - } - } - - try { - TimeUnit.SECONDS.sleep(60); - } catch (Exception e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - - } - - logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); - - } - }); - monitorThread.setDaemon(true); - monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); - monitorThread.start(); - } - - public void toStop(){ - toStop = true; - - // stop registryOrRemoveThreadPool - callbackThreadPool.shutdownNow(); - - // stop monitorThread (interrupt and wait) - monitorThread.interrupt(); - try { - monitorThread.join(); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - - - // ---------------------- helper ---------------------- - - public ReturnT callback(List callbackParamList) { - - callbackThreadPool.execute(new Runnable() { - @Override - public void run() { - for (HandleCallbackParam handleCallbackParam: callbackParamList) { - ReturnT callbackResult = callback(handleCallbackParam); - logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", - (callbackResult.getCode()== IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult); - } - } - }); - - return ReturnT.SUCCESS; - } - - private ReturnT callback(HandleCallbackParam handleCallbackParam) { - // valid log item - XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); - if (log == null) { - return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); - } - if (log.getHandleCode() > 0) { - return new ReturnT(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc - } - - // handle msg - StringBuffer handleMsg = new StringBuffer(); - if (log.getHandleMsg()!=null) { - handleMsg.append(log.getHandleMsg()).append("
"); - } - if (handleCallbackParam.getExecuteResult().getMsg() != null) { - handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); - } - - // success, save log - log.setHandleTime(new Date()); - log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); - log.setHandleMsg(handleMsg.toString()); - XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); - - return ReturnT.SUCCESS; - } - - - -} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java index f728f56..3c01e94 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java @@ -1,6 +1,6 @@ package com.xxl.job.admin.service.impl; -import com.xxl.job.admin.core.thread.JobLogHelper; +import com.xxl.job.admin.core.thread.JobCompleteHelper; import com.xxl.job.admin.core.thread.JobRegistryHelper; import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.model.HandleCallbackParam; @@ -19,7 +19,7 @@ @Override public ReturnT callback(List callbackParamList) { - return JobLogHelper.getInstance().callback(callbackParamList); + return JobCompleteHelper.getInstance().callback(callbackParamList); } @Override