Procházet zdrojové kódy

1.增加数据库分片功能

tl před 6 měsíci
rodič
revize
3ea8fd298a

+ 6 - 2
cpp-admin/pom.xml

@@ -16,7 +16,11 @@
     </description>
 
     <dependencies>
-
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.33</version>
+        </dependency>
         <!-- spring-boot-devtools -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -37,7 +41,7 @@
             <version>1.6.2</version>
         </dependency>
 
-         <!-- Mysql驱动包 -->
+        <!-- Mysql驱动包 -->
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>

+ 112 - 0
cpp-admin/src/main/java/com/cpp/web/core/config/sharding/ShardingAlgorithmTool.java

@@ -0,0 +1,112 @@
+package com.cpp.web.core.config.sharding;
+
+import cn.hutool.extra.spring.SpringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.core.env.Environment;
+import java.sql.*;
+import java.time.YearMonth;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+
+/**
+ *  按月分片算法工具
+ *
+ * @date 2024/10/28 14:03
+ */
+@Slf4j
+public class ShardingAlgorithmTool {
+
+    /** 表分片符号,例:cpp_forecast_power_ultra_short_term_regulation_202201 中,分片符号为 "_" */
+    private static final String TABLE_SPLIT_SYMBOL = "_";
+
+    /** 数据库配置 */
+    private static final Environment ENV = SpringUtil.getApplicationContext().getEnvironment();
+    private static final String DATASOURCE_URL = ENV.getProperty("my.sharding.create-table.url");
+    private static final String DATASOURCE_USERNAME = ENV.getProperty("my.sharding.create-table.username");
+    private static final String DATASOURCE_PASSWORD = ENV.getProperty("my.sharding.create-table.password");
+
+
+    /**
+     * 获取所有表名
+     * @return 表名集合
+     * @param logicTableName 逻辑表
+     */
+    public static List<String> getAllTableNameBySchema(String logicTableName) {
+        List<String> tableNames = new ArrayList<>();
+        if (StringUtils.isEmpty(DATASOURCE_URL) || StringUtils.isEmpty(DATASOURCE_USERNAME) || StringUtils.isEmpty(DATASOURCE_PASSWORD)) {
+            log.error(">>>>>>>>>> 【ERROR】数据库连接配置有误,请稍后重试,URL:{}, username:{}, password:{}", DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD);
+            throw new IllegalArgumentException("数据库连接配置有误,请稍后重试");
+        }
+        try (Connection conn = DriverManager.getConnection(DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD);
+             Statement st = conn.createStatement()) {
+            try (ResultSet rs = st.executeQuery("show TABLES like '" + logicTableName + TABLE_SPLIT_SYMBOL + "%'")) {
+                while (rs.next()) {
+                    String tableName = rs.getString(1);
+                    // 匹配分表格式 例:^(cpp\_forecast\_power\_ultra\_short\_term\_regulation_\d{6})$
+                    if (tableName != null && tableName.matches(String.format("^(%s\\d{6})$", logicTableName + TABLE_SPLIT_SYMBOL))) {
+                        tableNames.add(rs.getString(1));
+                    }
+                }
+            }
+        } catch (SQLException e) {
+            log.error(">>>>>>>>>> 【ERROR】数据库连接失败,请稍后重试,原因:{}", e.getMessage(), e);
+            throw new IllegalArgumentException("数据库连接失败,请稍后重试");
+        }
+        return tableNames;
+    }
+
+
+    /**
+     * 创建分表2
+     * @param logicTableName  逻辑表
+     * @param resultTableName 真实表名,例:cpp_forecast_power_ultra_short_term_regulation_202201
+     * @return 创建结果(true创建成功,false未创建)
+     */
+    public static boolean createShardingTable(String logicTableName, String resultTableName) {
+        // 根据日期判断,当前月份之后和之前分表不进行创建
+        String month = resultTableName.replace(logicTableName + TABLE_SPLIT_SYMBOL,"");
+        if (!month.equals(logicTableName)){
+            YearMonth shardingMonth = YearMonth.parse(month, DateTimeFormatter.ofPattern("yyyyMM"));
+            if (shardingMonth.isAfter(YearMonth.now())||shardingMonth.isBefore(YearMonth.now())) {
+                return false;
+            }
+        }else {
+            return false;
+        }
+
+
+        synchronized (logicTableName.intern()) {
+            // 缓存中无此表,则建表并添加缓存
+            executeSql(Collections.singletonList("CREATE TABLE IF NOT EXISTS `" + resultTableName + "` LIKE `" + logicTableName + "`;"));
+        }
+        return true;
+    }
+
+    /**
+     * 执行SQL
+     * @param sqlList SQL集合
+     */
+    private static void executeSql(List<String> sqlList) {
+        if (StringUtils.isEmpty(DATASOURCE_URL) || StringUtils.isEmpty(DATASOURCE_USERNAME) || StringUtils.isEmpty(DATASOURCE_PASSWORD)) {
+            log.error(">>>>>>>>>> 【ERROR】数据库连接配置有误,请稍后重试,URL:{}, username:{}, password:{}", DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD);
+            throw new IllegalArgumentException("数据库连接配置有误,请稍后重试");
+        }
+        try (Connection conn = DriverManager.getConnection(DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD)) {
+            try (Statement st = conn.createStatement()) {
+                conn.setAutoCommit(false);
+                for (String sql : sqlList) {
+                    st.execute(sql);
+                }
+            } catch (Exception e) {
+                conn.rollback();
+                log.error(">>>>>>>>>> 【ERROR】数据表创建执行失败,请稍后重试,原因:{}", e.getMessage(), e);
+                throw new IllegalArgumentException("数据表创建执行失败,请稍后重试");
+            }
+        } catch (SQLException e) {
+            log.error(">>>>>>>>>> 【ERROR】数据库连接失败,请稍后重试,原因:{}", e.getMessage(), e);
+            throw new IllegalArgumentException("数据库连接失败,请稍后重试");
+        }
+    }
+
+}

+ 139 - 0
cpp-admin/src/main/java/com/cpp/web/core/config/sharding/ShardingTablesLoadRunner.java

@@ -0,0 +1,139 @@
+package com.cpp.web.core.config.sharding;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.cpp.web.domain.alarm.AbnormalAlarm;
+import com.cpp.web.domain.cloud.ForecastPowerShortTermCloud;
+import com.cpp.web.domain.cloud.ForecastPowerUltraShortTermCloud;
+import com.cpp.web.domain.cloud.NwpCloud;
+import com.cpp.web.domain.datafactory.ParsingLog;
+import com.cpp.web.domain.station.*;
+import com.cpp.web.service.alarm.AbnormalAlarmService;
+import com.cpp.web.service.cloud.ForecastPowerShortTermCloudService;
+import com.cpp.web.service.cloud.ForecastPowerUltraShortTermCloudService;
+import com.cpp.web.service.cloud.NwpCloudService;
+import com.cpp.web.service.datafactory.ParsingLogService;
+import com.cpp.web.service.station.*;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.time.LocalDateTime;
+import java.util.Date;
+
+/**
+ * <p> @Title ShardingTablesLoadRunner
+ * <p> @Description 项目启动后,读取已有分表,进行缓存
+ *
+ * @author ACGkaka
+ * @date 2022/12/20 15:41
+ */
+@Slf4j
+@Order(value = 1) // 数字越小,越先执行
+@Component
+@RequiredArgsConstructor
+public class ShardingTablesLoadRunner implements CommandLineRunner {
+
+
+    private final ForecastPowerUltraShortTermRegulationService forecastPowerUltraShortTermRegulationService;
+    private final ForecastPowerUltraShortTermCloudService forecastPowerUltraShortTermCloudService;
+    private final ForecastPowerUltraShortTermStationService forecastPowerUltraShortTermStationService;
+    private final ForecastPowerShortTermRegulationService forecastPowerShortTermRegulationService;
+    private final ForecastPowerShortTermCloudService forecastPowerShortTermCloudService;
+    private final ForecastPowerShortTermStationService forecastPowerShortTermStationService;
+    private final NwpCloudService nwpCloudService;
+    private final NwpStationService nwpStationService;
+    private final WeatherStationStatusDataService weatherStationStatusDataService;
+    private final WindTowerStatusDataService windTowerStatusDataService;
+    private final WindTurbineStatusDataService windTurbineStatusDataService;
+    private final InverterStatusDataService inverterStatusDataService;
+    private final PowerStationStatusDataService powerStationStatusDataService;
+    private final ParsingLogService parsingLogService;
+    private final ForecastPowerShortTermSendService forecastPowerShortTermSendService;
+
+
+
+    @Override
+    public void run(String... args) {
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<ForecastPowerUltraShortTermRegulation> forecastPowerUltraShortTermRegulationLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        forecastPowerUltraShortTermRegulationLambdaQueryWrapper.eq(ForecastPowerUltraShortTermRegulation::getTime, new Date()).last("limit 1");
+        forecastPowerUltraShortTermRegulationService.list(forecastPowerUltraShortTermRegulationLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<ForecastPowerUltraShortTermCloud> forecastPowerUltraShortTermCloudLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        forecastPowerUltraShortTermCloudLambdaQueryWrapper.eq(ForecastPowerUltraShortTermCloud::getTime, new Date()).last("limit 1");
+        forecastPowerUltraShortTermCloudService.list(forecastPowerUltraShortTermCloudLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<ForecastPowerUltraShortTermStation> forecastPowerUltraShortTermStationLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        forecastPowerUltraShortTermStationLambdaQueryWrapper.eq(ForecastPowerUltraShortTermStation::getTime, new Date()).last("limit 1");
+        forecastPowerUltraShortTermStationService.list(forecastPowerUltraShortTermStationLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<ForecastPowerShortTermCloud> forecastPowerShortTermCloudLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        forecastPowerShortTermCloudLambdaQueryWrapper.eq(ForecastPowerShortTermCloud::getTime, new Date()).last("limit 1");
+        forecastPowerShortTermCloudService.list(forecastPowerShortTermCloudLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<ForecastPowerShortTermRegulation> forecastPowerShortTermRegulationLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        forecastPowerShortTermRegulationLambdaQueryWrapper.eq(ForecastPowerShortTermRegulation::getTime, new Date()).last("limit 1");
+        forecastPowerShortTermRegulationService.list(forecastPowerShortTermRegulationLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<ForecastPowerShortTermStation> forecastPowerShortTermStationLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        forecastPowerShortTermStationLambdaQueryWrapper.eq(ForecastPowerShortTermStation::getTime, new Date()).last("limit 1");
+        forecastPowerShortTermStationService.list(forecastPowerShortTermStationLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<ForecastPowerShortTermSend> forecastPowerShortTermSendLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        forecastPowerShortTermSendLambdaQueryWrapper.eq(ForecastPowerShortTermSend::getTime, new Date()).last("limit 1");
+        forecastPowerShortTermSendService.list(forecastPowerShortTermSendLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<WeatherStationStatusData> weatherStationStatusDataLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        weatherStationStatusDataLambdaQueryWrapper.eq(WeatherStationStatusData::getTime, new Date()).last("limit 1");
+        weatherStationStatusDataService.list(weatherStationStatusDataLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<WindTowerStatusData> windTowerStatusDataLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        windTowerStatusDataLambdaQueryWrapper.eq(WindTowerStatusData::getTime, new Date()).last("limit 1");
+        windTowerStatusDataService.list(windTowerStatusDataLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<WindTurbineStatusData> windTurbineStatusDataLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        windTurbineStatusDataLambdaQueryWrapper.eq(WindTurbineStatusData::getTime, new Date()).last("limit 1");
+        windTurbineStatusDataService.list(windTurbineStatusDataLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<InverterStatusData> inverterStatusDataLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        inverterStatusDataLambdaQueryWrapper.eq(InverterStatusData::getTime, new Date()).last("limit 1");
+        inverterStatusDataService.list(inverterStatusDataLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<PowerStationStatusData> powerStationStatusDataLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        powerStationStatusDataLambdaQueryWrapper.eq(PowerStationStatusData::getTime, new Date()).last("limit 1");
+        powerStationStatusDataService.list(powerStationStatusDataLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<ParsingLog> parsingLogLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        parsingLogLambdaQueryWrapper.eq(ParsingLog::getCreateTime, new Date()).last("limit 1");
+        parsingLogService.list(parsingLogLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<NwpCloud> nwpCloudLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        nwpCloudLambdaQueryWrapper.eq(NwpCloud::getTime, new Date()).last("limit 1");
+        nwpCloudService.list(nwpCloudLambdaQueryWrapper);
+
+        // 读取已有分表,进行缓存
+        LambdaQueryWrapper<NwpStation> nwpStationLambdaQueryWrapper = new LambdaQueryWrapper<>();
+        nwpStationLambdaQueryWrapper.eq(NwpStation::getTime, new Date()).last("limit 1");
+        nwpStationService.list(nwpStationLambdaQueryWrapper);
+
+
+
+        log.info(">>>>>>>>>> 【ShardingTablesLoadRunner】缓存已有分表成功 <<<<<<<<<<");
+    }
+}

+ 233 - 0
cpp-admin/src/main/java/com/cpp/web/core/config/sharding/TimeShardingAlgorithm.java

@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cpp.web.core.config.sharding;
+
+import com.google.common.collect.Range;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.shardingsphere.sharding.api.sharding.ShardingAutoTableAlgorithm;
+import org.apache.shardingsphere.sharding.api.sharding.standard.PreciseShardingValue;
+import org.apache.shardingsphere.sharding.api.sharding.standard.RangeShardingValue;
+import org.apache.shardingsphere.sharding.api.sharding.standard.StandardShardingAlgorithm;
+
+import java.text.ParseException;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * 分片算法,按月分片
+ *
+ * @author tl
+ * @date 2024/10/28 11:33
+ */
+@Slf4j
+public final class TimeShardingAlgorithm implements StandardShardingAlgorithm<Date>, ShardingAutoTableAlgorithm {
+
+    /**
+     * 分片时间格式
+     */
+    private final String PATTERN = "yyyyMM";
+
+    /**
+     * 完整时间格式
+     */
+    private static final String DATE_TIME_FORMATTER = "yyyyMMdd HH:mm:ss";
+
+    /**
+     * 表分片符号,例:cpp_forecast_power_ultra_short_term_regulation_202201 中,分片符号为 "_"
+     */
+    private final String TABLE_SPLIT_SYMBOL = "_";
+
+    @Getter
+    private Properties props;
+
+    @Getter
+    private int autoTablesAmount;
+
+    @Override
+    public void init(final Properties props) {
+        this.props = props;
+    }
+
+    @Override
+    public String doSharding(final Collection<String> availableTargetNames, final PreciseShardingValue<Date> preciseShardingValue) {
+        log.info(">>>>>>>>>>>>>>>>sharding开始<<<<<<<<<<<<<<<<<<<<");
+        String logicTableName = preciseShardingValue.getLogicTableName();
+
+        /// 打印分片信息
+        log.info(">>>>>>>>>> 【INFO】精确分片,节点配置表名:{}", availableTargetNames);
+
+        Date dateTime = preciseShardingValue.getValue();
+        String resultTableName = logicTableName + "_" + DateFormatUtils.format(dateTime, PATTERN);
+
+        // 检查是否需要初始化
+        if (availableTargetNames.size() == 1) {
+            // 如果只有一个表,说明需要获取所有表名
+            List<String> allTableNameBySchema = ShardingAlgorithmTool.getAllTableNameBySchema(logicTableName);
+            availableTargetNames.clear();
+            availableTargetNames.addAll(allTableNameBySchema);
+            autoTablesAmount = allTableNameBySchema.size();
+//            return resultTableName;
+        }
+        String shardingTableAndCreate = getShardingTableAndCreate(logicTableName, resultTableName, availableTargetNames);
+        log.info(">>>>>>>>>>>>>>>>sharding结束<<<<<<<<<<<<<<<<<<<<");
+        return shardingTableAndCreate;
+    }
+
+    @Override
+    public Collection<String> doSharding(final Collection<String> availableTargetNames, final RangeShardingValue<Date> rangeShardingValue) {
+        log.info(">>>>>>>>>>>>>>>>sharding开始<<<<<<<<<<<<<<<<<<<<");
+        String logicTableName = rangeShardingValue.getLogicTableName();
+
+        /// 打印分片信息
+        log.info(">>>>>>>>>> 【INFO】范围分片,节点配置表名:{}", availableTargetNames);
+
+        // between and 的起始值
+        Range<Date> valueRange = rangeShardingValue.getValueRange();
+        boolean hasLowerBound = valueRange.hasLowerBound();
+        boolean hasUpperBound = valueRange.hasUpperBound();
+
+        // 获取最大值和最小值
+        Long min = hasLowerBound ? valueRange.lowerEndpoint().getTime() : getLowerEndpoint(availableTargetNames).getTime();
+
+        Long longMonth = 86400000L;
+        Long max = (hasUpperBound ? valueRange.upperEndpoint().getTime() : getUpperEndpoint(availableTargetNames).getTime()) + longMonth;
+        // 循环计算分表范围
+        Set<String> resultTableNames = new LinkedHashSet<>();
+
+        //确保小于最小时间表时原始表可被查询
+        String minString = Collections.min(availableTargetNames.stream().filter(a->a.contains(logicTableName+TABLE_SPLIT_SYMBOL)).collect(Collectors.toList()));
+        if (minString.replace(logicTableName+TABLE_SPLIT_SYMBOL,"").compareTo(DateFormatUtils.format(min, PATTERN))<=0){
+            resultTableNames.add(logicTableName);
+        }
+
+        while (min < max || min == max) {
+            String tableName = logicTableName + TABLE_SPLIT_SYMBOL + DateFormatUtils.format(min, PATTERN);
+            resultTableNames.add(tableName);
+            min += longMonth;
+        }
+        Set<String> shardingTablesAndCreate = getShardingTablesAndCreate(logicTableName, resultTableNames, availableTargetNames);
+        log.info(">>>>>>>>>>>>>>>>sharding结束<<<<<<<<<<<<<<<<<<<<");
+        return shardingTablesAndCreate;
+    }
+
+    @Override
+    public String getType() {
+        return "AUTO_CUSTOM";
+    }
+
+    // --------------------------------------------------------------------------------------------------------------
+    // 私有方法
+    // --------------------------------------------------------------------------------------------------------------
+
+
+    /**
+     * 检查分表获取的表名是否存在,不存在则自动建表
+     *
+     * @param logicTableName       逻辑表
+     * @param resultTableNames     真实表名,例:cpp_forecast_power_ultra_short_term_regulation_202201
+     * @param availableTargetNames 可用的数据库表名
+     * @return 存在于数据库中的真实表名集合
+     */
+    public Set<String> getShardingTablesAndCreate(String logicTableName, Collection<String> resultTableNames, Collection<String> availableTargetNames) {
+        return resultTableNames.stream().map(o -> getShardingTableAndCreate(logicTableName, o, availableTargetNames)).collect(Collectors.toSet());
+    }
+
+    /**
+     * 检查分表获取的表名是否存在,不存在则自动建表
+     *
+     * @param logicTableName  逻辑表
+     * @param resultTableName 真实表名,例:cpp_forecast_power_ultra_short_term_regulation_202201
+     * @return 确认存在于数据库中的真实表名
+     */
+    private String getShardingTableAndCreate(String logicTableName, String resultTableName, Collection<String> availableTargetNames) {
+        // 缓存中有此表则返回,没有则判断创建
+        if (availableTargetNames.contains(resultTableName)) {
+            log.info(">>>>>>>>>> 【INFO】范围分片结束,节点配置表名:{}", availableTargetNames);
+            return resultTableName;
+        } else {
+            log.info(">>>>>>>>>> 【INFO】范围分片需要创建表,节点配置表名:{}", availableTargetNames);
+            // 检查分表获取的表名不存在,需要自动建表
+            boolean isSuccess = ShardingAlgorithmTool.createShardingTable(logicTableName, resultTableName);
+            if (isSuccess) {
+                // 如果建表成功,需要更新缓存
+                availableTargetNames.add(resultTableName);
+                autoTablesAmount++;
+                log.info(">>>>>>>>>> 【INFO】范围分片结束,节点配置表名:{}", availableTargetNames);
+                return resultTableName;
+            } else {
+                // 如果建表失败,返回逻辑空表
+                log.info(">>>>>>>>>> 【INFO】范围分片结束,节点配置表名:{}", availableTargetNames);
+                return logicTableName;
+            }
+        }
+    }
+
+    /**
+     * 获取 最小分片值
+     *
+     * @param tableNames 表名集合
+     * @return 最小分片值
+     */
+    private Date getLowerEndpoint(Collection<String> tableNames) {
+        Optional<Date> optional = tableNames.stream()
+                .map(o -> {
+                    try {
+                        return DateUtils.parseDate(o.replace(TABLE_SPLIT_SYMBOL, "") + "01 00:00:00", DATE_TIME_FORMATTER);
+                    } catch (ParseException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                })
+                .min(Comparator.comparing(Function.identity()));
+        if (optional.isPresent()) {
+            return optional.get();
+        } else {
+            log.error(">>>>>>>>>> 【ERROR】获取数据最小分表失败,请稍后重试,tableName:{}", tableNames);
+            throw new IllegalArgumentException("获取数据最小分表失败,请稍后重试");
+        }
+    }
+
+    /**
+     * 获取 最大分片值
+     *
+     * @param tableNames 表名集合
+     * @return 最大分片值
+     */
+    private Date getUpperEndpoint(Collection<String> tableNames) {
+        Optional<Date> optional = tableNames.stream()
+                .map(o -> {
+                    try {
+                        return DateUtils.parseDate(o.replace(TABLE_SPLIT_SYMBOL, "") + "01 00:00:00", DATE_TIME_FORMATTER);
+                    } catch (ParseException e) {
+                        e.printStackTrace();
+                        return null;
+                    }
+                })
+                .max(Comparator.comparing(Function.identity()));
+        if (optional.isPresent()) {
+            return optional.get();
+        } else {
+            log.error(">>>>>>>>>> 【ERROR】获取数据最大分表失败,请稍后重试,tableName:{}", tableNames);
+            throw new IllegalArgumentException("获取数据最大分表失败,请稍后重试");
+        }
+    }
+}

+ 1 - 1
cpp-admin/src/main/java/com/cpp/web/service/accuracy/impl/CalculateForecastPowerUltraShortTermCloudImpl.java

@@ -37,7 +37,7 @@ public class CalculateForecastPowerUltraShortTermCloudImpl implements CalculateI
     public List<AccuracyPassRate> calc(Date startTime, Date endTime, List<PowerStationStatusData> powerStationStatusDataList, List<ElectricField> electricFieldList, List<String> formulaTypes, String province) {
 
         List<AccuracyPassRate> accuracyPassRates = new ArrayList<>();
-        String cdq_ac_point = sysConfigService.selectConfigByKey("cdq_ac_point");
+        String cdq_ac_point = sysConfigService.selectConfigByKey("cdqHowLongAgo");
         int ago = 1;
         try {
             ago = Integer.parseInt(cdq_ac_point);

+ 1 - 1
cpp-admin/src/main/java/com/cpp/web/service/accuracy/impl/CalculateForecastPowerUltraShortTermRegulationImpl.java

@@ -36,7 +36,7 @@ public class CalculateForecastPowerUltraShortTermRegulationImpl implements Calcu
     public List<AccuracyPassRate> calc(Date startTime, Date endTime, List<PowerStationStatusData> powerStationStatusDataList, List<ElectricField> electricFieldList, List<String> formulaTypes, String province) {
 
         List<AccuracyPassRate> accuracyPassRates = new ArrayList<>();
-        String cdq_ac_point = sysConfigService.selectConfigByKey("cdq_ac_point");
+        String cdq_ac_point = sysConfigService.selectConfigByKey("cdqHowLongAgo");
         int ago = 1;
         try {
             ago = Integer.parseInt(cdq_ac_point);

+ 1 - 1
cpp-admin/src/main/java/com/cpp/web/service/accuracy/impl/CalculateForecastPowerUltraShortTermStationImpl.java

@@ -36,7 +36,7 @@ public class CalculateForecastPowerUltraShortTermStationImpl implements Calculat
     public List<AccuracyPassRate> calc(Date startTime, Date endTime, List<PowerStationStatusData> powerStationStatusDataList, List<ElectricField> electricFieldList, List<String> formulaTypes, String province) {
 
         List<AccuracyPassRate> accuracyPassRates = new ArrayList<>();
-        String cdq_ac_point = sysConfigService.selectConfigByKey("cdq_ac_point");
+        String cdq_ac_point = sysConfigService.selectConfigByKey("cdqHowLongAgo");
         int ago = 1;
         try {
             ago = Integer.parseInt(cdq_ac_point);

+ 39 - 6
cpp-admin/src/main/java/com/cpp/web/service/datafactory/impl/ParsingCdqServiceImpl.java

@@ -2,24 +2,23 @@ package com.cpp.web.service.datafactory.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.cpp.common.utils.spring.SpringUtils;
 import com.cpp.system.service.ISysConfigService;
 import com.cpp.web.domain.datafactory.BaseParsing;
 import com.cpp.web.domain.datafactory.ParsingCdq;
 import com.cpp.web.domain.datafactory.dto.ParsingConfParam;
 import com.cpp.web.domain.datafactory.dto.ParsingResultDto;
 import com.cpp.web.domain.datafactory.enums.FileTypeEnum;
+import com.cpp.web.domain.station.ForecastPowerUltraShortTermRegulation;
 import com.cpp.web.domain.station.ForecastPowerUltraShortTermStation;
 import com.cpp.web.mapper.datafactory.ParsingCdqMapper;
-import com.cpp.web.service.datafactory.DataStore;
 import com.cpp.web.service.datafactory.ParsingCdqService;
 import com.cpp.web.service.datafactory.ParsingInterface;
-import com.cpp.web.service.datafactory.ScheduledHelper;
+import com.cpp.web.service.station.ForecastPowerUltraShortTermRegulationService;
 import com.cpp.web.service.station.ForecastPowerUltraShortTermStationService;
-import com.cpp.web.utils.DateTimeUtil;
 import com.cpp.web.utils.ParsingFieldUtil;
 import com.cpp.web.utils.ParsingFileUtil;
 import com.cpp.web.utils.ParsingUtil;
-import lombok.AllArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -31,7 +30,6 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * 超短期解析业务层实现类
@@ -103,6 +101,11 @@ public class ParsingCdqServiceImpl extends ServiceImpl<ParsingCdqMapper, Parsing
                                                     .eq("station_code", stationCode)
                                                     .between("time", forecastPowerUltraShortTermStationList.get(0).getTime(), forecastPowerUltraShortTermStationList.get(forecastPowerUltraShortTermStationList.size() - 1).getTime()));
                                     forecastPowerUltraShortTermStationService.saveBatch(forecastPowerUltraShortTermStationList);
+                                    String syncRegulation = configService.selectConfigByKey("sync_regulation");
+                                    if (!syncRegulation.isEmpty() && syncRegulation.equals("true")) {
+                                        syncRegulation(stationCode, forecastPowerUltraShortTermStationList, genTime);
+                                    }
+
                                     log.info("[" + stationCode + "]解析超短期文件:{} 成功! O(∩_∩)O", file.getName());
                                     parsingResultDto.setStatus("success");
                                 } else {
@@ -194,7 +197,6 @@ public class ParsingCdqServiceImpl extends ServiceImpl<ParsingCdqMapper, Parsing
         try {
             ParsingConfParam config = ParsingFieldUtil.getConfig(sign);
 
-
             String[] lineNumber = config.getLineNumber().split("-");
             int startLine = ParsingUtil.noParseInt(lineNumber[0]);
             int endLine = ParsingUtil.noParseInt(lineNumber[1]);
@@ -227,4 +229,35 @@ public class ParsingCdqServiceImpl extends ServiceImpl<ParsingCdqMapper, Parsing
     }
 
 
+    private final ForecastPowerUltraShortTermRegulationService powerUltraShortTermRegulationService;
+
+    /**
+     * 同步调控后超短期数据表
+     *
+     * @param stationCode
+     * @param forecastPowerUltraShortTermStationList
+     * @param genTime
+     */
+    public void syncRegulation(String stationCode, List<ForecastPowerUltraShortTermStation> forecastPowerUltraShortTermStationList, Date genTime) {
+        log.info("同步调控后超短期数据表");
+        List<ForecastPowerUltraShortTermRegulation> forecastPowerUltraShortTermRegulationList = new ArrayList<>();
+        for (ForecastPowerUltraShortTermStation forecastPowerUltraShortTermStation : forecastPowerUltraShortTermStationList) {
+            ForecastPowerUltraShortTermRegulation forecastPowerUltraShortTermRegulation = new ForecastPowerUltraShortTermRegulation();
+            forecastPowerUltraShortTermRegulation.setGenTime(forecastPowerUltraShortTermStation.getGenTime());
+            forecastPowerUltraShortTermRegulation.setForecastHowLongAgo(forecastPowerUltraShortTermStation.getForecastHowLongAgo());
+            forecastPowerUltraShortTermRegulation.setFpValue(forecastPowerUltraShortTermStation.getFpValue());
+            forecastPowerUltraShortTermRegulation.setTime(forecastPowerUltraShortTermStation.getTime());
+            forecastPowerUltraShortTermRegulation.setOpenCapacity(forecastPowerUltraShortTermStation.getOpenCapacity());
+            forecastPowerUltraShortTermRegulation.setStationCode(stationCode);
+            forecastPowerUltraShortTermRegulationList.add(forecastPowerUltraShortTermRegulation);
+        }
+
+        SpringUtils.getBean(ParsingRcdqServiceImpl.class).scheduleAddCache(forecastPowerUltraShortTermRegulationList, stationCode);
+        powerUltraShortTermRegulationService.remove(
+                new QueryWrapper<ForecastPowerUltraShortTermRegulation>()
+                        .eq("gen_time", genTime)
+                        .eq("station_code", stationCode)
+                        .between("time", forecastPowerUltraShortTermRegulationList.get(0).getTime(), forecastPowerUltraShortTermRegulationList.get(forecastPowerUltraShortTermRegulationList.size() - 1).getTime()));
+        powerUltraShortTermRegulationService.saveBatch(forecastPowerUltraShortTermRegulationList);
+    }
 }

+ 21 - 59
cpp-admin/src/main/resources/application-druid.yml

@@ -1,61 +1,23 @@
 # 数据源配置
 spring:
-    datasource:
-        type: com.alibaba.druid.pool.DruidDataSource
-        driverClassName: com.mysql.cj.jdbc.Driver
-        druid:
-            # 主库数据源
-            master:
-                url: jdbc:mysql://192.168.1.205:3306/cpp?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
-                username: root
-                password: '!QAZ2root'
-            # 从库数据源
-            slave:
-                # 从数据源开关/默认关闭
-                enabled: false
-                url:
-                username:
-                password:
-            # 初始连接数
-            initialSize: 5
-            # 最小连接池数量
-            minIdle: 10
-            # 最大连接池数量
-            maxActive: 20
-            # 配置获取连接等待超时的时间
-            maxWait: 60000
-            # 配置连接超时时间
-            connectTimeout: 30000
-            # 配置网络超时时间
-            socketTimeout: 60000
-            # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
-            timeBetweenEvictionRunsMillis: 60000
-            # 配置一个连接在池中最小生存的时间,单位是毫秒
-            minEvictableIdleTimeMillis: 300000
-            # 配置一个连接在池中最大生存的时间,单位是毫秒
-            maxEvictableIdleTimeMillis: 900000
-            # 配置检测连接是否有效
-            validationQuery: SELECT 1 FROM DUAL
-            testWhileIdle: true
-            testOnBorrow: false
-            testOnReturn: false
-            webStatFilter:
-                enabled: true
-            statViewServlet:
-                enabled: true
-                # 设置白名单,不填则允许所有访问
-                allow:
-                url-pattern: /druid/*
-                # 控制台管理用户名和密码
-                login-username: ruoyi
-                login-password: 123456
-            filter:
-                stat:
-                    enabled: true
-                    # 慢SQL记录
-                    log-slow-sql: true
-                    slow-sql-millis: 1000
-                    merge-sql: true
-                wall:
-                    config:
-                        multi-statement-allow: true
+  ### 处理连接池冲突 #####
+  main:
+    allow-bean-definition-overriding: true
+  datasource:
+    driver-class-name: org.apache.shardingsphere.driver.ShardingSphereDriver
+    url: jdbc:shardingsphere:classpath:sharding.yaml
+
+
+
+my:
+  sharding:
+    create-table:
+      url: jdbc:mysql://192.168.1.205:3306/cpp?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
+      username: root
+      password: '!QAZ2root'
+    slave:
+      # 从数据源开关/默认关闭
+      enabled: false
+      url:
+      username:
+      password:

+ 176 - 0
cpp-admin/src/main/resources/sharding.yaml

@@ -0,0 +1,176 @@
+dataSources:
+  cpp:
+    dataSourceClassName: com.alibaba.druid.pool.DruidDataSource
+    driverClassName: com.mysql.cj.jdbc.Driver
+    url: jdbc:mysql://192.168.1.205:3306/cpp?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+    username: root
+    password: '!QAZ2root'
+    # 初始连接数
+    initialSize: 5
+    # 最小连接池数量
+    minIdle: 10
+    # 最大连接池数量
+    maxActive: 20
+    # 配置获取连接等待超时的时间
+    maxWait: 60000
+    # 配置连接超时时间
+    connectTimeout: 30000
+    # 配置网络超时时间
+    socketTimeout: 60000
+    # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+    timeBetweenEvictionRunsMillis: 60000
+    # 配置一个连接在池中最小生存的时间,单位是毫秒
+    minEvictableIdleTimeMillis: 300000
+    # 配置一个连接在池中最大生存的时间,单位是毫秒
+    maxEvictableIdleTimeMillis: 900000
+    # 配置检测连接是否有效
+    validationQuery: SELECT 1 FROM DUAL
+    testWhileIdle: true
+    testOnBorrow: false
+    testOnReturn: false
+    webStatFilter:
+      enabled: true
+    statViewServlet:
+      enabled: true
+      # 设置白名单,不填则允许所有访问
+      allow:
+      url-pattern: /druid/*
+      # 控制台管理用户名和密码
+      login-username: ruoyi
+      login-password: 123456
+    filter:
+      stat:
+        enabled: true
+        # 慢SQL记录
+        log-slow-sql: true
+        slow-sql-millis: 1000
+        merge-sql: true
+      wall:
+        config:
+          multi-statement-allow: true
+#  cpp:
+#    dataSourceClassName: com.zaxxer.hikari.HikariDataSource
+#    driverClassName: com.mysql.jdbc.Driver
+#    jdbcUrl: jdbc:mysql://192.168.1.205:3306/cpp?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
+#    username: root
+#    password: '!QAZ2root'
+#    hikari:
+#      minimum-idle: 5 # 最小空闲连接数量
+#      idle-timeout: 600000 # 空闲连接存活最大时间,默认为10分钟(600000毫秒),此处设置为3分钟(180000毫秒)
+#      maximum-pool-size: 20 # 连接池最大连接数,默认为10
+#      auto-commit: true # 自动提交事务,默认为true
+#      max-lifetime: 1800000 # 连接最大生命周期,默认为30分钟(1800000毫秒)
+#      connection-timeout: 30000 # 数据库连接超时时间,默认为30秒(30000毫秒)
+rules:
+- !SHARDING
+  tables:
+    cpp_forecast_power_ultra_short_term_regulation:
+      actualDataNodes: cpp.cpp_forecast_power_ultra_short_term_regulation
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_forecast_power_ultra_short_term_station:
+      actualDataNodes: cpp.cpp_forecast_power_ultra_short_term_station
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_forecast_power_ultra_short_term_cloud:
+      actualDataNodes: cpp.cpp_forecast_power_ultra_short_term_cloud
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_forecast_power_short_term_station:
+      actualDataNodes: cpp.cpp_forecast_power_short_term_station
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_forecast_power_short_term_send:
+      actualDataNodes: cpp.cpp_forecast_power_short_term_send
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_forecast_power_short_term_regulation:
+      actualDataNodes: cpp.cpp_forecast_power_short_term_regulation
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_forecast_power_short_term_cloud:
+      actualDataNodes: cpp.cpp_forecast_power_short_term_cloud
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_power_station_status_data:
+      actualDataNodes: cpp.cpp_power_station_status_data
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_parsing_log:
+      actualDataNodes: cpp.cpp_parsing_log
+      tableStrategy:
+        standard:
+          shardingColumn: create_time
+          shardingAlgorithmName: auto-custom
+    cpp_weather_station_status_data:
+      actualDataNodes: cpp.cpp_weather_station_status_data
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_wind_tower_status_data:
+      actualDataNodes: cpp.cpp_wind_tower_status_data
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_wind_turbine_status_data:
+      actualDataNodes: cpp.cpp_wind_turbine_status_data
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_inverter_status_data:
+      actualDataNodes: cpp.cpp_inverter_status_data
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_nwp_cloud:
+      actualDataNodes: cpp.cpp_nwp_cloud
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+    cpp_nwp_station:
+      actualDataNodes: cpp.cpp_nwp_station
+      tableStrategy:
+        standard:
+          shardingColumn: time
+          shardingAlgorithmName: auto-custom
+  shardingAlgorithms:
+    auto-custom:
+      type: CLASS_BASED
+      props:
+        strategy: standard
+        algorithmClassName: com.cpp.web.core.config.sharding.TimeShardingAlgorithm
+    interval:
+      type: INTERVAL
+      props:
+        datetime-pattern: 'yyyy-MM'
+        datetime-lower: '2024-01'
+        datetime-upper: '2025-12'
+        sharding-suffix-pattern: 'yyyyMM'
+        # 间隔大小
+        datetime-interval-amount: 1
+        datetime-interval-unit: 'Months'
+
+props:
+  sql-show: false
+  allow-range-query-with-inline-sharding: true

+ 17 - 0
cpp-common/pom.xml

@@ -16,6 +16,13 @@
     </description>
 
     <dependencies>
+        <!-- Sharding-JDBC -->
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-jdbc-core</artifactId>
+            <version>5.3.0</version>
+        </dependency>
+
         <dependency>
             <groupId>cn.hutool</groupId>
             <artifactId>hutool-all</artifactId>
@@ -128,4 +135,14 @@
         </dependency>
     </dependencies>
 
+<!--    <dependencyManagement>-->
+<!--        <dependencies>-->
+<!--            <dependency>-->
+<!--                <groupId>org.yaml</groupId>-->
+<!--                <artifactId>snakeyaml</artifactId>-->
+<!--                <version>1.33</version>-->
+<!--            </dependency>-->
+<!--        </dependencies>-->
+<!--    </dependencyManagement>-->
+
 </project>