|
@@ -6,6 +6,7 @@ import org.quartz.JobDataMap;
|
|
|
import org.quartz.JobKey;
|
|
|
import org.quartz.Scheduler;
|
|
|
import org.quartz.SchedulerException;
|
|
|
+import org.springframework.beans.BeansException;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
@@ -16,14 +17,19 @@ import com.xvji.quartz.mapper.SysJobMapper;
|
|
|
import com.xvji.quartz.service.ISysJobService;
|
|
|
import com.xvji.quartz.util.CronUtils;
|
|
|
import com.xvji.quartz.util.ScheduleUtils;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.context.ApplicationContextAware;
|
|
|
+import java.lang.reflect.Method;
|
|
|
|
|
|
/**
|
|
|
* 定时任务调度信息 服务层
|
|
|
*
|
|
|
- * @author ruoyi
|
|
|
+ * @author lt
|
|
|
*/
|
|
|
@Service
|
|
|
-public class SysJobServiceImpl implements ISysJobService
|
|
|
+public class SysJobServiceImpl implements ISysJobService, ApplicationContextAware
|
|
|
{
|
|
|
@Autowired
|
|
|
private Scheduler scheduler;
|
|
@@ -31,6 +37,17 @@ public class SysJobServiceImpl implements ISysJobService
|
|
|
@Autowired
|
|
|
private SysJobMapper jobMapper;
|
|
|
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(SysJobServiceImpl.class);
|
|
|
+
|
|
|
+
|
|
|
+ //Spring上下文(用于获取其他模块的Bean)
|
|
|
+ private ApplicationContext applicationContext;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
+ this.applicationContext = applicationContext;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据)
|
|
|
*/
|
|
@@ -150,20 +167,79 @@ public class SysJobServiceImpl implements ISysJobService
|
|
|
*
|
|
|
* @param job 调度信息
|
|
|
*/
|
|
|
+ // 修改 SysJobServiceImpl 的 changeStatus 方法
|
|
|
@Override
|
|
|
@Transactional(rollbackFor = Exception.class)
|
|
|
- public int changeStatus(SysJob job) throws SchedulerException
|
|
|
- {
|
|
|
+ public int changeStatus(SysJob job) throws SchedulerException {
|
|
|
int rows = 0;
|
|
|
- String status = job.getStatus();
|
|
|
- if (ScheduleConstants.Status.NORMAL.getValue().equals(status))
|
|
|
- {
|
|
|
- rows = resumeJob(job);
|
|
|
+ String cronStatus = job.getStatus(); // 定时任务状态:0=启用,1=暂停
|
|
|
+ Long jobId = job.getJobId();
|
|
|
+ String jobGroup = job.getJobGroup();
|
|
|
+
|
|
|
+ // 1. 处理定时任务自身状态变更
|
|
|
+ if (ScheduleConstants.Status.NORMAL.getValue().equals(cronStatus)) {
|
|
|
+ rows = resumeJob(job); // 启用:cronStatus=0
|
|
|
+ } else if (ScheduleConstants.Status.PAUSE.getValue().equals(cronStatus)) {
|
|
|
+ rows = pauseJob(job); // 暂停:cronStatus=1
|
|
|
}
|
|
|
- else if (ScheduleConstants.Status.PAUSE.getValue().equals(status))
|
|
|
- {
|
|
|
- rows = pauseJob(job);
|
|
|
+
|
|
|
+ // 2. 同步业务任务状态(核心:按你的需求映射)
|
|
|
+ if (rows > 0 && ("PREDICT_TASK".equals(jobGroup) || "TRAIN_TASK".equals(jobGroup))) {
|
|
|
+ try {
|
|
|
+ // 解析业务任务ID
|
|
|
+ Class<?> invokeTargetUtilsClass = Class.forName("com.xvji.utils.InvokeTargetUtils");
|
|
|
+ Method parseMethod = invokeTargetUtilsClass.getMethod("parseTaskIdFromInvokeTarget", String.class);
|
|
|
+ Long taskId = (Long) parseMethod.invoke(null, job.getInvokeTarget());
|
|
|
+ if (taskId == null) {
|
|
|
+ log.error("定时任务[ID:{}]状态变更,无法解析业务任务ID", jobId);
|
|
|
+ return rows;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 按你的需求映射状态:
|
|
|
+ Integer bizStatus;
|
|
|
+ if ("1".equals(cronStatus)) {
|
|
|
+ // 定时任务暂停(1)→ 业务任务状态=2(未启用)
|
|
|
+ bizStatus = 2;
|
|
|
+ } else {
|
|
|
+ // 定时任务启用(0)→ 保留上次执行结果(不修改)
|
|
|
+ log.info("定时任务[ID:{}]启用,业务任务[ID:{}]保留上次状态(0失败/1成功)", jobId, taskId);
|
|
|
+ return rows;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 同步到预测任务
|
|
|
+ if ("PREDICT_TASK".equals(jobGroup)) {
|
|
|
+ Object predictTaskService = applicationContext.getBean("predictTaskServiceImpl");
|
|
|
+ Class<?> predictTaskClass = Class.forName("com.xvji.domain.PredictTask");
|
|
|
+ Object predictTask = predictTaskClass.newInstance();
|
|
|
+ // 只更新ID和状态
|
|
|
+ Method setPTaskId = predictTaskClass.getMethod("setPTaskId", Long.class);
|
|
|
+ Method setPTaskStatus = predictTaskClass.getMethod("setPTaskStatus", Integer.class);
|
|
|
+ setPTaskId.invoke(predictTask, taskId);
|
|
|
+ setPTaskStatus.invoke(predictTask, bizStatus);
|
|
|
+ // 调用更新
|
|
|
+ Method updateMethod = predictTaskService.getClass().getMethod("updateById", Object.class);
|
|
|
+ updateMethod.invoke(predictTaskService, predictTask);
|
|
|
+ log.info("定时任务[ID:{}]暂停,同步预测任务[ID:{}]状态为2(未启用)", jobId, taskId);
|
|
|
+ }
|
|
|
+ // 同步到训练任务
|
|
|
+ else if ("TRAIN_TASK".equals(jobGroup)) {
|
|
|
+ Object trainTaskService = applicationContext.getBean("trainTaskServiceImpl");
|
|
|
+ Class<?> trainTaskClass = Class.forName("com.xvji.domain.TrainTask");
|
|
|
+ Object trainTask = trainTaskClass.newInstance();
|
|
|
+ Method setTTaskId = trainTaskClass.getMethod("setTTaskId", Long.class);
|
|
|
+ Method setTTaskStatus = trainTaskClass.getMethod("setTTaskStatus", Integer.class);
|
|
|
+ setTTaskId.invoke(trainTask, taskId);
|
|
|
+ setTTaskStatus.invoke(trainTask, bizStatus);
|
|
|
+ Method updateMethod = trainTaskService.getClass().getMethod("updateById", Object.class);
|
|
|
+ updateMethod.invoke(trainTaskService, trainTask);
|
|
|
+ log.info("定时任务[ID:{}]暂停,同步训练任务[ID:{}]状态为2(未启用)", jobId, taskId);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("定时任务[ID:{}]状态变更,同步业务任务失败", jobId, e);
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
return rows;
|
|
|
}
|
|
|
|
|
@@ -210,17 +286,103 @@ public class SysJobServiceImpl implements ISysJobService
|
|
|
*/
|
|
|
@Override
|
|
|
@Transactional(rollbackFor = Exception.class)
|
|
|
- public int updateJob(SysJob job) throws SchedulerException, TaskException
|
|
|
- {
|
|
|
+ public int updateJob(SysJob job) throws SchedulerException, TaskException {
|
|
|
SysJob properties = selectJobById(job.getJobId());
|
|
|
int rows = jobMapper.updateJob(job);
|
|
|
- if (rows > 0)
|
|
|
- {
|
|
|
+ if (rows > 0) {
|
|
|
updateSchedulerJob(job, properties.getJobGroup());
|
|
|
+
|
|
|
+ log.info("定时任务[ID:{}]更新后,开始同步业务任务", job.getJobId());
|
|
|
+ // 1. 预测任务同步(原有逻辑不变)
|
|
|
+ if ("PREDICT_TASK".equals(job.getJobGroup())) {
|
|
|
+ try {
|
|
|
+ Class<?> invokeTargetUtilsClass = Class.forName("com.xvji.utils.InvokeTargetUtils");
|
|
|
+ Method parseMethod = invokeTargetUtilsClass.getMethod("parseTaskIdFromInvokeTarget", String.class);
|
|
|
+ Long taskId = (Long) parseMethod.invoke(null, job.getInvokeTarget());
|
|
|
+
|
|
|
+ if (taskId == null) {
|
|
|
+ log.error("无法解析预测任务ID");
|
|
|
+ return rows;
|
|
|
+ }
|
|
|
+
|
|
|
+ Integer taskStatus = ScheduleConstants.Status.NORMAL.getValue().equals(job.getStatus()) ? 1 : 0;
|
|
|
+ Object predictTaskService = applicationContext.getBean("predictTaskServiceImpl");
|
|
|
+ if (predictTaskService == null) {
|
|
|
+ log.error("未找到predictTaskService");
|
|
|
+ return rows;
|
|
|
+ }
|
|
|
+
|
|
|
+ Class<?> predictTaskClass = Class.forName("com.xvji.domain.PredictTask");
|
|
|
+ Object predictTask = predictTaskClass.newInstance();
|
|
|
+ Method setPTaskId = predictTaskClass.getMethod("setPTaskId", Long.class);
|
|
|
+ Method setPCronExpression = predictTaskClass.getMethod("setPCronExpression", String.class);
|
|
|
+ Method setPTaskName = predictTaskClass.getMethod("setPTaskName", String.class);
|
|
|
+ Method setPTaskStatus = predictTaskClass.getMethod("setPTaskStatus", Integer.class);
|
|
|
+
|
|
|
+ setPTaskId.invoke(predictTask, taskId);
|
|
|
+ setPCronExpression.invoke(predictTask, job.getCronExpression());
|
|
|
+ setPTaskName.invoke(predictTask, job.getJobName() + "[SYNCED]");
|
|
|
+ setPTaskStatus.invoke(predictTask, taskStatus);
|
|
|
+
|
|
|
+ Method updateMethod = predictTaskService.getClass().getMethod("updateById", Object.class);
|
|
|
+ updateMethod.invoke(predictTaskService, predictTask);
|
|
|
+ log.info("同步预测任务[ID:{}]成功", taskId);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("同步预测任务失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 2. 新增:训练任务同步(和预测任务逻辑完全对齐)
|
|
|
+ else if ("TRAIN_TASK".equals(job.getJobGroup())) {
|
|
|
+ try {
|
|
|
+ // 解析训练任务ID
|
|
|
+ Class<?> invokeTargetUtilsClass = Class.forName("com.xvji.utils.InvokeTargetUtils");
|
|
|
+ Method parseMethod = invokeTargetUtilsClass.getMethod("parseTaskIdFromInvokeTarget", String.class);
|
|
|
+ Long taskId = (Long) parseMethod.invoke(null, job.getInvokeTarget());
|
|
|
+
|
|
|
+ if (taskId == null) {
|
|
|
+ log.error("无法解析训练任务ID");
|
|
|
+ return rows;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 状态转换
|
|
|
+ Integer taskStatus = ScheduleConstants.Status.NORMAL.getValue().equals(job.getStatus()) ? 1 : 0;
|
|
|
+
|
|
|
+ // 获取训练任务Service
|
|
|
+ Object trainTaskService = applicationContext.getBean("trainTaskServiceImpl");
|
|
|
+ if (trainTaskService == null) {
|
|
|
+ log.error("未找到trainTaskService");
|
|
|
+ return rows;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 反射设置训练任务属性
|
|
|
+ Class<?> trainTaskClass = Class.forName("com.xvji.domain.TrainTask");
|
|
|
+ Object trainTask = trainTaskClass.newInstance();
|
|
|
+ Method setTTaskId = trainTaskClass.getMethod("setTTaskId", Long.class);
|
|
|
+ Method setTCronExpression = trainTaskClass.getMethod("setTCronExpression", String.class);
|
|
|
+ Method setTTaskName = trainTaskClass.getMethod("setTTaskName", String.class);
|
|
|
+ Method setTTaskStatus = trainTaskClass.getMethod("setTTaskStatus", Integer.class);
|
|
|
+
|
|
|
+ setTTaskId.invoke(trainTask, taskId);
|
|
|
+ setTCronExpression.invoke(trainTask, job.getCronExpression());
|
|
|
+ setTTaskName.invoke(trainTask, job.getJobName() + "[SYNCED]"); // 防循环标记
|
|
|
+ setTTaskStatus.invoke(trainTask, taskStatus);
|
|
|
+
|
|
|
+ // 调用更新
|
|
|
+ Method updateMethod = trainTaskService.getClass().getMethod("updateById", Object.class);
|
|
|
+ updateMethod.invoke(trainTaskService, trainTask);
|
|
|
+ log.info("同步训练任务[ID:{}]成功", taskId);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("同步训练任务失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
return rows;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* 更新任务
|
|
|
*
|
|
@@ -251,4 +413,43 @@ public class SysJobServiceImpl implements ISysJobService
|
|
|
{
|
|
|
return CronUtils.isValid(cronExpression);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SysJob selectJobByParam(String jobParam) {
|
|
|
+ // 调用 Mapper 新增的 selectJobByField,完整匹配 invoke_target
|
|
|
+ return jobMapper.selectJobByField("invoke_target", jobParam);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deleteJobByParam(String jobParam) {
|
|
|
+ // 调用 Mapper 新增的 deleteJobByField,完整匹配 invoke_target
|
|
|
+ jobMapper.deleteJobByField("invoke_target", jobParam);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SysJob getSysJobByCondition(String conditionValue) {
|
|
|
+ // 模糊匹配 invoke_target(如包含"executePredictTask(1001)")
|
|
|
+ SysJob queryJob = new SysJob();
|
|
|
+ queryJob.setInvokeTarget("%" + conditionValue + "%");
|
|
|
+ List<SysJob> jobList = jobMapper.selectJobList(queryJob);
|
|
|
+ return jobList != null && !jobList.isEmpty() ? jobList.get(0) : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deleteSysJobByCondition(String field, String value) {
|
|
|
+ // 按指定字段删除(如按 invoke_target 完整匹配)
|
|
|
+ jobMapper.deleteJobByField(field, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean updateSysJob(SysJob sysJob) {
|
|
|
+ try {
|
|
|
+ // 复用若依原生 updateJob 方法
|
|
|
+ int rows = updateJob(sysJob);
|
|
|
+ return rows > 0;
|
|
|
+ } catch (SchedulerException | TaskException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|