Ver Fonte

定时任务补充

刘桐 há 5 dias atrás
pai
commit
0bcf5ce9ef

+ 5 - 0
xvji-admin/pom.xml

@@ -98,6 +98,11 @@
             <scope>test</scope>
         </dependency>
 
+        <!--<dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-mongodb</artifactId>
+        </dependency>-->
+
         <!-- Spring Boot测试依赖 -->
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 5 - 7
xvji-admin/src/main/java/com/xvji/quartz/TaskQuartzJob.java

@@ -28,27 +28,25 @@ public class TaskQuartzJob implements ApplicationContextAware {
     }
 
     /**
-     * 手动获取Spring Bean(解决Quartz注入问题)
+     * 手动获取Bean
      */
     private <T> T getBean(Class<T> clazz) {
         return applicationContext.getBean(clazz);
     }
 
     /**
-     * 预测任务执行入口:通过是否抛异常判断成功/失败(适配void返回值)
+     * 预测任务执行入口:通过是否抛异常判断成功/失败
      */
     public void executePredictTask(Integer taskId) {
         // 记录开始时间,用于计算执行耗时
         long startTime = System.currentTimeMillis();
 
         try {
-            // 1. 手动获取TaskComponentExecutor
+            // 获取TaskComponentExecutor
             TaskComponentExecutor componentExecutor = getBean(TaskComponentExecutor.class);
-
-            // 2. 执行核心业务逻辑(void方法,无返回值)
             componentExecutor.executeComponents(taskId.longValue(), 1);
 
-            // 3. 若未抛异常视为成功
+            // 未抛异常视为成功
             PredictTaskService predictTaskService = getBean(PredictTaskService.class);
             PredictTask predictTask = new PredictTask();
             predictTask.setPTaskId(taskId.longValue());
@@ -61,7 +59,7 @@ public class TaskQuartzJob implements ApplicationContextAware {
             log.info("预测任务[ID:{}]执行成功,状态更新为1,耗时:{}ms", taskId, costTime);
 
         } catch (Exception e) {
-            // 4. 若抛异常,视为失败,记录异常信息
+            // 若抛异常,视为失败,记录异常信息
             PredictTaskService predictTaskService = getBean(PredictTaskService.class);
             PredictTask predictTask = new PredictTask();
             predictTask.setPTaskId(taskId.longValue());

+ 1 - 0
xvji-admin/src/main/java/com/xvji/service/impl/PredictTaskServiceImpl.java

@@ -109,6 +109,7 @@ public class PredictTaskServiceImpl extends ServiceImpl<PredictTaskMapper, Predi
             log.info("检测到是定时任务同步的更新,跳过反向同步");
             // 移除标记,避免影响显示
             task.setPTaskName(task.getPTaskName().replace("[SYNCED]", ""));
+            this.updateById(task);
             return;
         }
 

+ 8 - 8
xvji-admin/src/main/java/com/xvji/service/impl/TrainTaskServiceImpl.java

@@ -25,11 +25,11 @@ public class TrainTaskServiceImpl extends ServiceImpl<TrainTaskMapper, TrainTask
 
     @Autowired
     private ComponentService componentService;
-    // 新增:注入定时任务Service
+    // 注入定时任务Service
     @Autowired
     private ISysJobService sysJobService;
 
-    // 新增:日志对象
+    // 日志对象
     private static final Logger log = LoggerFactory.getLogger(TrainTaskServiceImpl.class);
 
     /**
@@ -82,15 +82,15 @@ public class TrainTaskServiceImpl extends ServiceImpl<TrainTaskMapper, TrainTask
             return false;
         }
 
-        // 1. 删除关联组件(原有逻辑不变)
+        //  删除关联组件
         QueryWrapper<Component> componentQueryWrapper = new QueryWrapper<>();
         componentQueryWrapper.eq("TASK_ID", taskId).eq("TASK_TYPE", 0);
         componentService.remove(componentQueryWrapper);
 
-        // 2. 删除训练任务(原有逻辑不变)
+        // 删除训练任务
         this.removeById(taskId);
 
-        // 3. 新增:同步删除关联的定时任务
+        // 同步删除关联的定时任务
         String targetInvokeTarget = "com.xvji.quartz.TaskQuartzJob.executeTrainTask(" + taskId + ")";
         sysJobService.deleteJobByParam(targetInvokeTarget);
         log.info("训练任务[ID:{}]及关联组件、定时任务已删除", taskId);
@@ -117,7 +117,7 @@ public class TrainTaskServiceImpl extends ServiceImpl<TrainTaskMapper, TrainTask
         }
 
         Long taskId = task.getTTaskId();
-        // 精确匹配定时任务的invoke_target(格式:executeTrainTask(xxx))
+        // 精确匹配定时任务的invoke_target
         String exactInvokeTarget = "com.xvji.quartz.TaskQuartzJob.executeTrainTask(" + taskId + ")";
         SysJob sysJob = sysJobService.selectJobByParam(exactInvokeTarget);
 
@@ -151,14 +151,14 @@ public class TrainTaskServiceImpl extends ServiceImpl<TrainTaskMapper, TrainTask
     @Transactional
     @Override
     public boolean updateTrainTask(TrainTask task) {
-        // 1. 先更新训练任务表
+        // 先更新训练任务表
         boolean updateResult = this.updateById(task);
         if (!updateResult) {
             log.error("训练任务[ID:{}]更新失败,同步定时任务终止", task.getTTaskId());
             return false;
         }
 
-        // 2. 调用已有同步方法,同步到定时任务
+        // 调用已有同步方法,同步到定时任务
         syncToSysJob(task);
         return true;
     }

+ 161 - 59
xvji-admin/src/main/java/com/xvji/service/task/TaskComponentExecutor.java

@@ -35,17 +35,17 @@ public class TaskComponentExecutor {
      * @param taskType 任务类型0=训练任务,1=预测任务
      */
     public void executeComponents(Long taskId, Integer taskType) {
-        // 1. 查询关联组件
+        // 查询关联组件
         List<Component> components = componentMapper.selectByTaskIdAndType(taskId, taskType);
         if (components.isEmpty()) {
             log.warn("任务ID:{} 没有关联的组件,无需执行", taskId);
             return;
         }
 
-        // 记录任务开始时间(用于计算总耗时)
+        // 记录任务开始时间 计算总耗时
         long taskStartTime = System.currentTimeMillis();
 
-        // 2. 循环调用组件(失败立即终止并抛异常)
+        // 循环调用组件 失败立即终止并抛异常
         for (Component component : components) {
             // 跳过未启用的组件
             if (!component.getIsEnable()) {
@@ -60,54 +60,86 @@ public class TaskComponentExecutor {
 
             log.info("开始调用组件[{}],接口地址:{},参数:{}",
                     componentName, interfaceUrl, params);
-
+            String compType1 = component.getComponentType();
+            if (compType1 != null && compType1.contains("分析报告")) {
+                String db = (String) params.get("mongodb_database");
+                String table = (String) params.get("mongodb_read_table");
+                log.info("=== 分析报告组件调用前检查 ===\n数据源:db={}, table={}\n参数列表:{}",
+                        db, table, params);
+            }
             try {
                 // 调用组件接口
                 ResponseEntity<String> response = FormDataHttpUtil.postFormData(interfaceUrl, params);
                 long costTime = System.currentTimeMillis() - componentStartTime;
+                String responseBody = response.getBody();
+
+                // 解析响应体判断业务是否成功
+                boolean isBusinessSuccess = true;
+                if (responseBody != null && !responseBody.isEmpty()) {
+                    try {
+                        JSONObject resultJson = JSONObject.parseObject(responseBody);
+                        // 处理success为布尔值或数字的情况
+                        if (resultJson.containsKey("success")) {
+                            Object successObj = resultJson.get("success");
+                            if (successObj instanceof Boolean) {
+                                isBusinessSuccess = (Boolean) successObj;
+                            } else if (successObj instanceof Number) {
+                                isBusinessSuccess = ((Number) successObj).intValue() == 1;
+                            } else {
+                                isBusinessSuccess = "1".equals(successObj.toString()) ||
+                                        "true".equalsIgnoreCase(successObj.toString());
+                            }
+                        }
+                    } catch (Exception e) {
+                        log.warn("解析组件[{}]响应体失败,无法判断业务状态", componentName, e);
+                    }
+                }
 
-                // 处理HTTP响应(非2xx状态视为失败)
-                if (!response.getStatusCode().is2xxSuccessful()) {
-                    // 提取接口返回的错误信息(优先获取msg字段)
-                    String errorDetail = extractErrorMessage(response.getBody());
+                // 处理HTTP响应和业务响应
+                if (!response.getStatusCode().is2xxSuccessful() || !isBusinessSuccess) {
+                    // 提取错误信息
+                    String errorDetail = extractErrorMessage(responseBody);
                     String errorMsg = String.format(
-                            "组件[%s]调用失败(耗时:%dms),状态码:%d,错误信息:%s",
+                            "组件[%s]执行失败(耗时:%dms),HTTP状态码:%d,业务状态:%s,错误信息:%s",
                             componentName,
                             costTime,
                             response.getStatusCodeValue(),
+                            isBusinessSuccess ? "成功" : "失败",
                             errorDetail
                     );
                     log.error(errorMsg);
-                    throw new RuntimeException(errorMsg); // 非2xx状态抛异常
+                    throw new RuntimeException(errorMsg); // 抛异常终止任务
                 }
 
-                log.info("组件[{}]调用成功(耗时:{}ms),返回结果:{}",
+                log.info("组件[{}]业务执行成功(耗时:{}ms),返回结果:{}",
                         componentName,
-                        System.currentTimeMillis() - componentStartTime,
-                        response.getBody());
-
-                // 修复:将componentType改为component.getComponentType(),result改为response.getBody()
-                if ("分析报告".equals(component.getComponentType())) {
-                    JSONObject resultJson = JSONObject.parseObject(response.getBody());
-                    // 优先取file_path(报告文件路径),无则取msg(结果信息)
-                    String report = resultJson.getString("file_path");
-                    if (report == null || report.isEmpty()) {
-                        report = resultJson.getString("msg");
-                    }
-                    analysisReportResult.set(report); // 存储到ThreadLocal
+                        costTime,
+                        responseBody);
+
+                // 分析报告组件:只提取file_path
+                String compType = component.getComponentType();
+                log.info("当前组件类型:{},原始值:{}", compType, compType);
+                if (compType != null && compType.contains("分析报告")) {
+                    JSONObject resultJson = JSONObject.parseObject(responseBody);
+                    String reportFilePath = resultJson.getString("file_path");
+                    analysisReportResult.set(reportFilePath);
+                    String db = (String) params.get("mongodb_database");
+                    String table = (String) params.get("mongodb_read_table");
+                    log.info("=== 分析报告组件日志 ===\n数据源:db={}, table={}\n提取的file_path:{}",
+                            db, table, reportFilePath);
                 }
 
             } catch (Exception e) {
-                // 捕获所有异常(包括连接失败、超时等),包装后重新抛出
                 long costTime = System.currentTimeMillis() - componentStartTime;
+                String rootErrorMsg = getRootErrorMessage(e);
                 String errorMsg = String.format(
                         "组件[%s]调用异常(耗时:%dms):%s",
                         componentName,
                         costTime,
-                        e.getMessage()
+                        rootErrorMsg
                 );
                 log.error(errorMsg, e);
-                throw new RuntimeException(errorMsg, e); // 向外层抛出异常,携带详细信息
+                throw new RuntimeException(errorMsg, e); // 向外层抛出异常
             }
         }
 
@@ -116,47 +148,117 @@ public class TaskComponentExecutor {
     }
 
     /**
-     * 纯字符串解析方式提取msg字段,不依赖任何JSON库
-     * 适用于任何环境,兼容性100%
+     * 提取错误信息
      */
-    private String extractErrorMessage(String responseBody) {
+    /*private String extractErrorMessage(String responseBody) {
         if (responseBody == null || responseBody.trim().isEmpty()) {
             return "无错误信息";
         }
 
-        // 标准化处理:去除空格和制表符,便于匹配
-        String normalized = responseBody.replaceAll("\\s+", "");
-
-        // 查找"msg"字段的几种常见格式:"msg":"内容" 或 'msg':'内容' 或 msg:"内容"
-        String[] patterns = {"\"msg\":\"", "'msg':'", "msg\":\"", "\"msg':\"", "'msg\":\""};
-        for (String pattern : patterns) {
-            int startIndex = normalized.indexOf(pattern);
-            if (startIndex != -1) {
-                // 找到匹配的模式,计算内容起始位置
-                int contentStart = startIndex + pattern.length();
-
-                // 查找内容结束位置(引号)
-                int contentEnd = normalized.indexOf("\"", contentStart);
-                if (contentEnd == -1) {
-                    contentEnd = normalized.indexOf("'", contentStart);
-                }
-                if (contentEnd == -1) {
-                    contentEnd = normalized.indexOf("}", contentStart);
-                }
-                if (contentEnd == -1) {
-                    contentEnd = normalized.indexOf(",", contentStart);
-                }
+        System.out.println("=== 数据获取组件完整错误响应 ===");
+        System.out.println(responseBody);
+        System.out.println("===============================");
 
-                // 提取内容
-                if (contentEnd > contentStart) {
-                    return normalized.substring(contentStart, contentEnd);
+        // 1. 优先用fastjson解析
+        try {
+            JSONObject resultJson = JSONObject.parseObject(responseBody);
+            if (resultJson.containsKey("msg")) {
+                String msg = resultJson.getString("msg");
+                // 处理msg中的转义字符
+                if (msg != null) {
+                    msg = msg.replace("\\n", "\n").replace("\\t", "  ");
+                    // 限制长度
+                    int maxLength = Math.min(msg.length(), 1000);
+                    return msg.substring(0, maxLength) + (msg.length() > 1000 ? "\n...(日志过长,已截断)" : "");
                 }
             }
+        } catch (Exception e) {
+            log.debug("响应体不是标准JSON,尝试字符串提取,错误:{}", e.getMessage());
+        }
+
+        // 2. JSON解析失败,用原始字符串匹配
+        // 不压缩字符串(保留\n、空格),直接匹配msg字段
+        String lowerBody = responseBody.toLowerCase();
+        int msgStart = lowerBody.indexOf("\"msg\":\"");
+        if (msgStart == -1) {
+            msgStart = lowerBody.indexOf("'msg':'"); // 匹配 'msg':'xxx' 格式
+        }
+        if (msgStart == -1) {
+            msgStart = lowerBody.indexOf("msg\":\""); // 匹配 msg:"xxx" 格式(无引号)
         }
 
-        // 如果没找到msg字段,返回原始响应的前200个字符
-        int maxLength = Math.min(responseBody.length(), 200);
-        return responseBody.substring(0, maxLength) + (responseBody.length() > 200 ? "..." : "");
+        if (msgStart != -1) {
+            // 计算msg内容的起始位置(跳过"msg":")
+            int contentStart = msgStart + (lowerBody.startsWith("\"msg\":\"", msgStart) ? 6 :
+                    lowerBody.startsWith("'msg':'", msgStart) ? 6 : 5);
+            // 找msg内容的结束位置(匹配对应的引号)
+            char quoteChar = responseBody.charAt(msgStart + 1); // 取msg字段的引号类型("或')
+            int contentEnd = responseBody.indexOf(quoteChar, contentStart);
+            if (contentEnd == -1) {
+                contentEnd = responseBody.indexOf("}", contentStart); // 无引号则找JSON结束符
+            }
+            if (contentEnd == -1) {
+                contentEnd = responseBody.indexOf(",", contentStart); // 找逗号
+            }
+
+            if (contentEnd > contentStart) {
+                String msg = responseBody.substring(contentStart, contentEnd);
+                // 处理转义字符
+                msg = msg.replace("\\n", "\n").replace("\\t", "  ");
+                int maxLength = Math.min(msg.length(), 1000);
+                return msg.substring(0, maxLength) + (msg.length() > 1000 ? "\n...(日志过长,已截断)" : "");
+            }
+        }
+
+        // 3. 最终 fallback:返回原始响应的前1000个字符
+        int maxLength = Math.min(responseBody.length(), 1000);
+        String fallback = responseBody.substring(0, maxLength);
+        fallback = fallback.replace("\\n", "\n").replace("\\t", "  ");
+
+        String finalResult = fallback + (responseBody.length() > 1000 ? "\n...(日志过长,已截断)" : "");
+        log.debug("extractErrorMessage 处理后:长度={},内容前200字符:{}",
+                finalResult.length(),
+                finalResult.substring(0, Math.min(finalResult.length(), 200)));
+
+        return fallback + (responseBody.length() > 1000 ? "\n...(日志过长,已截断)" : "");
+    }*/
+
+    private String extractErrorMessage(String responseBody) {
+    if (responseBody == null || responseBody.trim().isEmpty()) {
+        return "无错误信息";
+    }
+
+    // 优先解析JSON 无论是否有msg字段,都返回完整内容
+    try {
+        JSONObject resultJson = JSONObject.parseObject(responseBody);
+        // 若有msg字段,返回完整msg;若无,返回完整JSON
+        if (resultJson.containsKey("msg")) {
+            String msg = resultJson.getString("msg");
+            return msg != null ? msg.replace("\\n", "\n").replace("\\t", "  ") : responseBody;
+        } else {
+            // 无msg字段,返回完整JSON
+            return responseBody.replace("\\n", "\n").replace("\\t", "  ");
+        }
+    } catch (Exception e) {
+        log.debug("响应体不是标准JSON,返回完整原始内容");
+    }
+
+    //JSON解析失败,返回完整原始响应
+    return responseBody.replace("\\n", "\n").replace("\\t", "  ");
+}
+
+
+    /**
+     * 提取错误信息
+     */
+    private String getRootErrorMessage(Throwable e) {
+        Throwable root = e;
+        // 找到最底层的异常
+        while (root.getCause() != null && root.getCause() != root) {
+            root = root.getCause();
+        }
+        // 返回原始错误信息
+        return root.getMessage() != null ? root.getMessage() : e.getMessage();
     }
 
     // 提供方法获取分析报告结果
@@ -164,8 +266,8 @@ public class TaskComponentExecutor {
         return analysisReportResult.get();
     }
 
-    // 任务执行完成后清理ThreadLocal(避免内存泄漏)
+    // 任务执行完成后清理ThreadLocal 避免内存泄漏
     public void clearAnalysisReportResult() {
         analysisReportResult.remove();
     }
-}
+}

+ 2 - 2
xvji-admin/src/main/java/com/xvji/web/controller/TrainTaskController.java

@@ -203,7 +203,7 @@ public class TrainTaskController {
             if (analysisReport != null) {
                 Boolean isEnable = parseEnableValue(analysisReport.get("isEnable"), "分析报告");
                 String type = (String) analysisReport.get("componentType");
-                String url = "http://ds1:10099/analysis_report_small";
+                String url = "http://ds3:10012/analysis_report_small";
                 Component component = createComponent(taskId, type, analysisReport, isEnable , url);
                 if (componentService.save(component)) {
                     componentIds.add(component.getComponentId());
@@ -475,7 +475,7 @@ public class TrainTaskController {
                 String type = (String) analysisReport.get("componentType");
                 String url = "";
                 if ("分析报告".equals(type)){
-                    url = "http://ds1:10099/analysis_report_small";
+                    url = "http://ds3:10012/analysis_report_small";
                 }
                 Component component = createComponent(tTaskId, type, analysisReport, isEnable , url);
                 if (componentService.save(component)) {

+ 2 - 0
xvji-quartz/src/main/java/com/xvji/quartz/mapper/SysJobLogMapper.java

@@ -2,12 +2,14 @@ package com.xvji.quartz.mapper;
 
 import java.util.List;
 import com.xvji.quartz.domain.SysJobLog;
+import org.apache.ibatis.annotations.Mapper;
 
 /**
  * 调度任务日志信息 数据层
  * 
  * @author ruoyi
  */
+@Mapper
 public interface SysJobLogMapper
 {
     /**

+ 8 - 12
xvji-quartz/src/main/java/com/xvji/quartz/service/impl/SysJobServiceImpl.java

@@ -167,7 +167,6 @@ public class SysJobServiceImpl implements ISysJobService, ApplicationContextAwar
      * 
      * @param job 调度信息
      */
-    // 修改 SysJobServiceImpl 的 changeStatus 方法
     @Override
     @Transactional(rollbackFor = Exception.class)
     public int changeStatus(SysJob job) throws SchedulerException {
@@ -176,14 +175,14 @@ public class SysJobServiceImpl implements ISysJobService, ApplicationContextAwar
         Long jobId = job.getJobId();
         String jobGroup = job.getJobGroup();
 
-        // 1. 处理定时任务自身状态变更
+        // 处理定时任务自身状态变更
         if (ScheduleConstants.Status.NORMAL.getValue().equals(cronStatus)) {
             rows = resumeJob(job); // 启用:cronStatus=0
         } else if (ScheduleConstants.Status.PAUSE.getValue().equals(cronStatus)) {
             rows = pauseJob(job); // 暂停:cronStatus=1
         }
 
-        // 2. 同步业务任务状态(核心:按你的需求映射)
+        // 同步业务任务状态
         if (rows > 0 && ("PREDICT_TASK".equals(jobGroup) || "TRAIN_TASK".equals(jobGroup))) {
             try {
                 // 解析业务任务ID
@@ -198,10 +197,10 @@ public class SysJobServiceImpl implements ISysJobService, ApplicationContextAwar
                 // 按你的需求映射状态:
                 Integer bizStatus;
                 if ("1".equals(cronStatus)) {
-                    // 定时任务暂停(1) 业务任务状态=2(未启用)
+                    // 定时任务暂停(1) 业务任务状态=2(未启用)
                     bizStatus = 2;
                 } else {
-                    // 定时任务启用(0) 保留上次执行结果(不修改)
+                    // 定时任务启用(0) 保留上次执行结果(不修改)
                     log.info("定时任务[ID:{}]启用,业务任务[ID:{}]保留上次状态(0失败/1成功)", jobId, taskId);
                     return rows;
                 }
@@ -293,7 +292,6 @@ public class SysJobServiceImpl implements ISysJobService, ApplicationContextAwar
             updateSchedulerJob(job, properties.getJobGroup());
 
             log.info("定时任务[ID:{}]更新后,开始同步业务任务", job.getJobId());
-            // 1. 预测任务同步(原有逻辑不变)
             if ("PREDICT_TASK".equals(job.getJobGroup())) {
                 try {
                     Class<?> invokeTargetUtilsClass = Class.forName("com.xvji.utils.InvokeTargetUtils");
@@ -332,7 +330,6 @@ public class SysJobServiceImpl implements ISysJobService, ApplicationContextAwar
                     log.error("同步预测任务失败", e);
                 }
             }
-            // 2. 新增:训练任务同步(和预测任务逻辑完全对齐)
             else if ("TRAIN_TASK".equals(job.getJobGroup())) {
                 try {
                     // 解析训练任务ID
@@ -416,19 +413,19 @@ public class SysJobServiceImpl implements ISysJobService, ApplicationContextAwar
 
     @Override
     public SysJob selectJobByParam(String jobParam) {
-        // 调用 Mapper 新增的 selectJobByField,完整匹配 invoke_target
+        // 完整匹配 invoke_target
         return jobMapper.selectJobByField("invoke_target", jobParam);
     }
 
     @Override
     public void deleteJobByParam(String jobParam) {
-        // 调用 Mapper 新增的 deleteJobByField,完整匹配 invoke_target
+        // 完整匹配 invoke_target
         jobMapper.deleteJobByField("invoke_target", jobParam);
     }
 
     @Override
     public SysJob getSysJobByCondition(String conditionValue) {
-        // 模糊匹配 invoke_target(如包含"executePredictTask(1001)")
+        // 模糊匹配 invoke_target
         SysJob queryJob = new SysJob();
         queryJob.setInvokeTarget("%" + conditionValue + "%");
         List<SysJob> jobList = jobMapper.selectJobList(queryJob);
@@ -437,14 +434,13 @@ public class SysJobServiceImpl implements ISysJobService, ApplicationContextAwar
 
     @Override
     public void deleteSysJobByCondition(String field, String value) {
-        // 按指定字段删除(如按 invoke_target 完整匹配)
+        // 按指定字段删除
         jobMapper.deleteJobByField(field, value);
     }
 
     @Override
     public boolean updateSysJob(SysJob sysJob) {
         try {
-            // 复用若依原生 updateJob 方法
             int rows = updateJob(sysJob);
             return rows > 0;
         } catch (SchedulerException | TaskException e) {