Przeglądaj źródła

bigsql数据恢复功能 每30分钟执行一次

小王 1 rok temu
rodzic
commit
ae20614cd6

+ 17 - 2
pom.xml

@@ -64,7 +64,22 @@
         <dependency>
             <groupId>com.taosdata.jdbc</groupId>
             <artifactId>taos-jdbcdriver</artifactId>
-            <version>3.2.4</version>
+            <version>3.2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ant</groupId>
+            <artifactId>ant</artifactId>
+            <version>1.8.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.junrar</groupId>
+            <artifactId>junrar</artifactId>
+            <version>0.7</version>
+        </dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId> <!-- 解决 sftp 连接 -->
+            <version>0.1.55</version>
         </dependency>
     </dependencies>
 
@@ -103,7 +118,7 @@
                 <artifactId>spring-boot-maven-plugin</artifactId>
                 <version>2.2.1.RELEASE</version>
                 <configuration>
-                    <mainClass>com.example.datadump.DataDumpApplication</mainClass>
+                    <mainClass>com.example.DataDumpApplication</mainClass>
                 </configuration>
                 <executions>
                     <execution>

+ 21 - 0
src/main/java/com/example/DataDumpApplication.java

@@ -0,0 +1,21 @@
+package com.example;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cache.annotation.EnableCaching;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@Configuration
+@SpringBootApplication()
+@Slf4j
+@EnableCaching
+@EnableScheduling
+public class DataDumpApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(DataDumpApplication.class, args);
+    }
+
+}

+ 19 - 0
src/main/java/com/example/bigsql/entity/UploadBigsqlFileRecordBaseVO.java

@@ -0,0 +1,19 @@
+package com.example.bigsql.entity;
+
+import lombok.Data;
+
+@Data
+public class UploadBigsqlFileRecordBaseVO {
+    /**
+     * id
+     */
+    private Integer id;
+
+
+    private String fileName;
+
+    private String remark;
+
+    private String status;
+
+}

+ 8 - 0
src/main/java/com/example/bigsql/handle/IFileHandle.java

@@ -0,0 +1,8 @@
+package com.example.bigsql.handle;
+
+/**
+ * Created by renhongqiang on 2019-07-02 14:11
+ */
+public interface IFileHandle {
+    void handle(String line) throws Exception;
+}

+ 31 - 0
src/main/java/com/example/bigsql/job/SqlRestorationJob.java

@@ -0,0 +1,31 @@
+package com.example.bigsql.job;
+
+import com.example.bigsql.service.CreateDatabasesByV3File;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+/**
+ * 从sql文件恢复数据库定时任务
+ * <p>
+ * &#064;Auther:  whc
+ * &#064;Date:  2024/3/08
+ */
+
+@Slf4j
+@Service
+@EnableScheduling
+public class SqlRestorationJob {
+    @Autowired
+    private CreateDatabasesByV3File createDatabasesByV3File;
+
+    //每半小时执行一次
+    @Scheduled(fixedDelay = 1800000)
+    public void execute() throws Exception {
+        log.info("开始执行读取sql文件定时任务");
+        createDatabasesByV3File.redSqlZip();
+        log.info("结束执行读取sql文件定时任务");
+    }
+}

+ 313 - 0
src/main/java/com/example/bigsql/service/CreateDatabasesByV3File.java

@@ -0,0 +1,313 @@
+package com.example.bigsql.service;
+
+
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.util.ZipUtil;
+import com.example.bigsql.entity.UploadBigsqlFileRecordBaseVO;
+import com.example.bigsql.util.ExecuteShellUtil;
+import com.example.bigsql.util.JyDbUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.*;
+import java.util.Date;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * 解析v3版大sql文件
+ */
+@Service
+@Slf4j
+public class CreateDatabasesByV3File {
+
+    private static boolean createTable(String databasesName, List<String> sqlStatements, Connection conn) {
+        try {
+            conn.setAutoCommit(false);
+            Statement statement = conn.createStatement();
+            sqlStatements.forEach(sql -> {
+                sql = sql.replace("EXISTS `", "EXISTS `" + databasesName + "`.`").replace(";", "");
+                sql = sql.replace("TABLE `", "TABLE `" + databasesName + "`.`").replace(";", "");
+                try {
+                    statement.addBatch(sql);
+
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            int[] i = statement.executeBatch();
+            conn.commit();
+            System.out.println("成功执行语句:" + i.length);
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    public static boolean createDatabase(String tableName, Connection conn) {
+        try {
+            Statement statement = conn.createStatement();
+            statement.execute("CREATE DATABASE `" + tableName + "` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci");
+
+            System.out.printf("Database '%s' created successfully!%n", tableName);
+            return true;
+        } catch (Exception e) {
+            System.out.printf("Database '%s' created error!%n", tableName);
+        }
+        return false;
+    }
+
+    private static List<UploadBigsqlFileRecordBaseVO> queryRecords() throws SQLException {
+        //连接mysql
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+        try {
+            String url = "jdbc:mysql://192.168.12.241:19400/jiayueCloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true";
+            //加载数据库驱动
+            Class.forName("com.mysql.cj.jdbc.Driver");
+            conn = DriverManager.getConnection(url, "root", "QJtgNR52+*L^E809-3");
+            String sql = "SELECT * FROM dc_upload_bigsql_file_record d WHERE d.status = '0'";
+            ps = conn.prepareStatement(sql);
+            rs = ps.executeQuery();
+            List<UploadBigsqlFileRecordBaseVO> uploadBigsqlFileRecordList = new ArrayList<>();
+            while (rs.next()) {
+                UploadBigsqlFileRecordBaseVO uploadBigsqlFileRecord = new UploadBigsqlFileRecordBaseVO();
+                uploadBigsqlFileRecord.setId(rs.getInt("id"));
+                uploadBigsqlFileRecord.setFileName(rs.getString("file_name"));
+                uploadBigsqlFileRecord.setRemark(rs.getString("remark"));
+                uploadBigsqlFileRecord.setStatus(rs.getString("status"));
+                uploadBigsqlFileRecordList.add(uploadBigsqlFileRecord);
+            }
+            return uploadBigsqlFileRecordList;
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (ps != null) {
+                ps.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+            if (conn != null) {
+                conn.close();
+            }
+        }
+    }
+
+
+    private static void updateRecords(int id, String status, String dbName) throws SQLException {
+        //连接mysql
+        Connection conn = null;
+        PreparedStatement ps = null;
+        try {
+            String url = "jdbc:mysql://192.168.12.241:19400/jiayueCloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true";
+            //加载数据库驱动
+            Class.forName("com.mysql.cj.jdbc.Driver");
+            conn = DriverManager.getConnection(url, "root", "QJtgNR52+*L^E809-3");
+            String sql = "UPDATE dc_upload_bigsql_file_record set status = '" + status + "' , db_name = '" + dbName + "' WHERE id = " + id + ";";
+            ps = conn.prepareStatement(sql);
+            ps.executeUpdate();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (ps != null) {
+                ps.close();
+            }
+            if (conn != null) {
+                conn.close();
+            }
+        }
+    }
+
+    public static File[] searchFile(File folder, final String keyWord) {// 递归查找包含关键字的文件
+
+        File[] subFolders = folder.listFiles(new FileFilter() {// 运用内部匿名类获得文件
+            @Override
+            public boolean accept(File pathname) {// 实现FileFilter类的accept方法
+                // 目录或文件包含关键字
+                return pathname.isDirectory()
+                        || (pathname.isFile() && pathname.getName().toLowerCase().contains(keyWord.toLowerCase()));
+            }
+        });
+
+        List<File> result = new ArrayList<File>();// 声明一个集合
+        for (int i = 0; i < subFolders.length; i++) {// 循环显示文件夹或文件
+            if (subFolders[i].isFile()) {// 如果是文件则将文件添加到结果列表中
+                result.add(subFolders[i]);
+            } else {// 如果是文件夹,则递归调用本方法,然后把所有的文件加到结果列表中
+                File[] foldResult = searchFile(subFolders[i], keyWord);
+                Collections.addAll(result, foldResult);
+            }
+        }
+
+        File[] files = new File[result.size()];// 声明文件数组,长度为集合的长度
+        result.toArray(files);// 集合数组化
+        return files;
+    }
+
+    //把insert语句分割成csv文件
+    public static void spliteFileToCsv(String line, Map<String, FileWriter> fileWriterMap, String csvFilePath) {
+        String outputFilePath = ".csv";
+        try {
+            if (line.trim().startsWith("INSERT")) {
+                String tableName = line.substring(13, line.indexOf("` VALUES"));
+                if (null == fileWriterMap.get(tableName)) {
+                    fileWriterMap.put(tableName, new FileWriter(csvFilePath + tableName + outputFilePath));
+                }
+                // 解析SQL语句,提取数据
+                String[] data = parseSQLLine(line);
+
+                if (data != null) {
+                    // 将数据以CSV格式写入文件
+                    fileWriterMap.get(tableName).write(String.join(",", data));
+                    fileWriterMap.get(tableName).write("\n");
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    // 解析SQL语句中的数据
+    private static String[] parseSQLLine(String line) {
+        if (line.trim().startsWith("INSERT")) {
+            //解析INSERT语句中的数据
+            int startIndex = line.indexOf("(");
+            int endIndex = line.indexOf(")");
+            if (startIndex == -1 || endIndex == -1) {
+                return null;
+            }
+            String data = line.substring(startIndex + 1, endIndex).replaceAll("'", "");
+
+            return data.replace(", ", ",").replace("\\", "").split(",");
+        }
+        return null;
+
+    }
+
+    /**
+     * 读取文件 并创建数据库和表
+     */
+    public void redSqlZip() throws Exception {
+        //未解析的sql压缩包包
+        List<UploadBigsqlFileRecordBaseVO> list = queryRecords();
+        log.info("找到" + list.size() + "条待恢复记录");
+        //sql zip包存放路径
+        //String path = "/Users/xiaowang/个人/";
+        String path = "/home/bigsql/";
+        Map<String, FileWriter> fileWriterMap = new HashMap<>();
+        File file = new File(path);
+        for (UploadBigsqlFileRecordBaseVO uploadBigsqlFileRecordBaseVO : list) {
+            File[] files = searchFile(file, uploadBigsqlFileRecordBaseVO.getFileName());
+            //根据文件名查找文件 正常没有数据或只有一条数据
+            for (File f : files) {
+                log.info("开始解析sql文件,文件名:" + f.getName());
+                //场站编号
+                String name = f.getName().substring(0, 6);
+                //String csvFilePath = "/Users/xiaowang/个人/csvdir/" + name + File.separator;
+                String csvFilePath = "/home/bigsql/csvdir/" + name + File.separator;
+                String sqlPath = "/home/bigsql/sqldir/";
+                //判断csvFilePath这个目录是否存在 如果不存在则创建目录
+                if (!new File(csvFilePath).exists()) {
+                    new File(csvFilePath).mkdirs();
+                }
+                if (!new File(sqlPath).exists()) {
+                    new File(sqlPath).mkdirs();
+                }
+                String databasesName = "";
+                //获取文件输入流
+                BufferedInputStream input = new BufferedInputStream(Files.newInputStream(Paths.get(f.getPath())));
+                //扫描解压目录下的sql文件 正常没有数据或只有一条数据
+                ZipUtil.unzip(f.getPath(), sqlPath, StandardCharsets.UTF_8);
+                File sFile = new File(sqlPath);
+                File[] sFiles = searchFile(sFile, ".sql");
+                if (sFiles.length == 0) {
+                    log.error(f.getName() + "压缩文件为空");
+                    break;
+                }
+                //循环遍历
+                try {
+                    try (BufferedReader reader = new BufferedReader(new FileReader(sFiles[0]))) {
+                        List<String> sqlStatements = new ArrayList<>();
+                        StringBuilder statementBuilder = new StringBuilder();
+                        String line;
+                        int lineSize = 0;
+                        while ((line = reader.readLine()) != null) {
+                            if (line.startsWith("Navicat") || line.startsWith("/")
+                                    || line.startsWith("INSERT") || line.startsWith("--") || line.startsWith("#")
+                                    || line.startsWith("//")) {
+                                lineSize++;
+                                //sql内的数据转换成csv文件
+                                spliteFileToCsv(line, fileWriterMap, csvFilePath);
+                                continue; // Skip comments
+                            }
+                            if (line.endsWith(";")) {
+                                statementBuilder.append(line); // Remove semicolon
+                                sqlStatements.add(statementBuilder.toString().trim()); // Add to list
+                                statementBuilder.setLength(0); // Reset statement buffer
+                            } else {
+                                statementBuilder.append(line).append(" ");
+                            }
+                        }
+                        sqlStatements = sqlStatements.stream().filter(s -> s.startsWith("CREATE") || s.startsWith("DROP")).collect(Collectors.toList());
+                        databasesName = "ipfcst_" + name + "_" + DateUtil.format(new Date(), DatePattern.PURE_DATETIME_FORMAT);
+                        Connection conn = JyDbUtil.conn;
+                        if (createDatabase(databasesName, conn)) {
+                            if (createTable(databasesName, sqlStatements, conn)) {
+                                log.info("执行数据库和数据表创建成功,其中数据共:" + lineSize);
+                            }
+                        }
+                    } catch (IOException e) {
+                        updateRecords(uploadBigsqlFileRecordBaseVO.getId(), "2", databasesName);
+                        throw new RuntimeException("读取Sql文件失败:  " + f.getName(), e);
+                    }
+
+
+                    input.close();
+                    //修改cloud数据库表里的状态
+                    updateRecords(uploadBigsqlFileRecordBaseVO.getId(), "1", databasesName);
+                } catch (Exception e) {
+                    log.info("读取zip文件时异常" + e);
+                } finally {
+                    input.close();
+                }
+
+                log.info("表结构导入完毕,开始进行表数据导入");
+
+                File csvFile = new File(csvFilePath);
+                File[] csvFiles = csvFile.listFiles();
+                if (csvFiles != null) {
+                    for (File csv : csvFiles) {
+                        insertData(databasesName, csv);
+                    }
+                }
+                log.info("数据导入完毕!!!");
+                FileUtil.del(sqlPath);
+                FileUtil.del(csvFilePath);
+                FileUtil.move(f, new File(path + File.separator + "backUps" + DateUtil.format(new Date(), DatePattern.PURE_DATE_PATTERN) + File.separator + f.getName()), true);
+            }
+        }
+    }
+
+    private void insertData(String databasesName, File table) throws Exception {
+
+        String[] tableNames = table.getName().split("\\.");
+        log.info(tableNames[0] + "表数据开始向" + databasesName + "数据库导入");
+        ExecuteShellUtil executeShellUtil = ExecuteShellUtil.getInstance();
+        executeShellUtil.init("192.168.12.10", 22, "root", "Jydl*3377");
+        //表名
+        executeShellUtil.execCmd("mysqlsh mysql://root:mysql_T7yN3E@127.0.0.1:19306 -- util import-table " + table.getPath() + " --schema=" + databasesName + " --table=" + tableNames[0] + " --dialect=csv-unix --threads=10");
+        //executeShellUtil.execCmd("mysqlsh mysql://root:123456@127.0.0.1:3306 -- util import-table D:\\bigsql\\csvdir\\J00307\\" + table.getName() + " --schema=" + databasesName + " --table=" + tableNames[0] + " --dialect=csv-unix --threads=10");
+        log.info("表" + tableNames[0] + "导入完毕");
+        executeShellUtil.close();
+    }
+}

+ 116 - 0
src/main/java/com/example/bigsql/service/ImportBigSqlV3File.java

@@ -0,0 +1,116 @@
+package com.example.bigsql.service;
+
+
+import cn.hutool.core.convert.Convert;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.thread.ExecutorBuilder;
+import com.example.bigsql.util.JyDbUtil;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * 解析v3版大sql文件
+ */
+
+public class ImportBigSqlV3File {
+    static AtomicInteger isSaveSize = new AtomicInteger(0);
+    static AtomicInteger totalSize = new AtomicInteger(0);
+
+    public static void main(String[] args) {
+        String filePath = args[0];
+        String databaseName = args[1];
+        System.out.println("单读取开始时间:" + DateUtil.now());
+        List<String> dataLinkList = Collections.synchronizedList(new ArrayList<>());
+        int batchSize = Convert.toInt(args[2], 20000);
+
+
+        try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
+
+            ExecutorService executor = ExecutorBuilder.create()//
+                    .setCorePoolSize(20)//
+                    .setMaxPoolSize(40)//
+                    .setKeepAliveTime(0)//
+                    .build();
+            Connection conn = JyDbUtil.conn;
+            conn.setAutoCommit(false);
+            String line;
+            while ((line = reader.readLine()) != null) {
+
+                if (line.startsWith("INSERT")) {
+                    dataLinkList.add(line);
+
+                    if (dataLinkList.size() == batchSize) {
+                        System.out.println("=========当前线程名==========" + Thread.currentThread().getName() + "===================");
+                        System.out.println("插入开始时间:" + DateUtil.now());
+
+                        Statement statement = conn.createStatement();
+                        System.out.println("创建数据库链接:" + DateUtil.now());
+                        totalSize.set(totalSize.get() + batchSize);
+
+                        System.out.println("总数累计:" + DateUtil.now());
+                        List<String> copyFruits = dataLinkList.stream().collect(Collectors.toList());
+                        System.out.println("复制list完成时间:" + DateUtil.now());
+                        System.out.println("读取插入语句有" + batchSize + "条,执行批量插入新线程");
+
+                        executor.submit(new Runnable() {
+                            //获得当前线程的名称
+
+                            @Override
+                            public void run() {
+                                try {
+                                    copyFruits.forEach(sql -> {
+                                        try {
+                                            sql = sql.replace("INTO `", "INTO `" + databaseName + "`.`").replace(";", "");
+                                            statement.addBatch(sql);
+                                        } catch (SQLException ex) {
+                                            throw new RuntimeException(ex);
+                                        }
+                                    });
+
+                                    int[] i = statement.executeBatch();
+                                    conn.commit();
+                                    System.out.println("=========当前线程名==========" + Thread.currentThread().getName() + "===================");
+                                    isSaveSize.set(isSaveSize.get() + i.length);
+                                    System.out.println("已读取:" + totalSize.get() + "已插入:" + isSaveSize.get() + " 完成:" + bs(isSaveSize.get(), totalSize.get()) + "%");
+                                } catch (Exception e) {
+                                    throw new RuntimeException(e);
+                                } finally {
+                                    try {
+                                        copyFruits.clear();
+                                        statement.clearBatch();
+                                        statement.close();
+                                    } catch (SQLException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                }
+                            }
+                        });
+                        dataLinkList.clear();
+                    }
+
+                }
+            }
+
+        } catch (
+                Exception e) {
+            throw new RuntimeException("Failed to read SQL file: " + filePath, e);
+        }
+    }
+
+    public static int bs(int a, int b) {
+        return (int) ((new BigDecimal((float) a / b).setScale(2, RoundingMode.HALF_UP).doubleValue()) * 100);
+    }
+
+}

+ 146 - 0
src/main/java/com/example/bigsql/service/JyDealBigSqlFile.java

@@ -0,0 +1,146 @@
+package com.example.bigsql.service;
+
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.DateUtil;
+import com.example.bigsql.util.JyDbUtil;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class JyDealBigSqlFile {
+    public static void main(String[] args) {
+        log.info("开始处理时间:{}", DateUtil.now());
+        String stationCode = "J00300";
+        //1.读文件,生成数据库。分割文件
+        String filePath = "E:\\迅雷下载\\ipfcst-vv3.sql";
+        String csvFilePath = "E:\\迅雷下载\\csvdir\\";
+        Map<String, FileWriter> fileWriterMap = new HashMap<>();
+        try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
+            List<String> sqlStatements = new ArrayList<>();
+            StringBuilder statementBuilder = new StringBuilder();
+            String line;
+            int lineSize = 0;
+            while ((line = reader.readLine()) != null) {
+
+                if (line.startsWith("Navicat") || line.startsWith("/")
+                        || line.startsWith("INSERT") || line.startsWith("--") || line.startsWith("#")
+                        || line.startsWith("//")) {
+                    lineSize++;
+                    spliteFileToCsv(line, fileWriterMap, csvFilePath);
+                    continue; // Skip comments
+                }
+                if (line.endsWith(";")) {
+                    statementBuilder.append(line); // Remove semicolon
+                    sqlStatements.add(statementBuilder.toString().trim()); // Add to list
+                    statementBuilder.setLength(0); // Reset statement buffer
+                } else {
+                    statementBuilder.append(line).append(" ");
+                }
+            }
+            sqlStatements = sqlStatements.stream().filter(s -> s.startsWith("CREATE") || s.startsWith("DROP")).collect(Collectors.toList());
+
+            String databasesName = "ipfcst_" + stationCode + "_" + DateUtil.format(new Date(), DatePattern.PURE_DATETIME_FORMAT);
+
+            Connection conn = JyDbUtil.conn;
+            if (createDatabase(databasesName, conn)) {
+                if (createTable(databasesName, sqlStatements, conn)) {
+                    log.info("执行数据库和数据表创建成功,其中数据共:" + lineSize);
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read SQL file: " + filePath, e);
+        }
+
+        log.info("完成处理时间:{}", DateUtil.now());
+    }
+
+
+    //把insert语句分割成csv文件
+    public static void spliteFileToCsv(String line, Map<String, FileWriter> fileWriterMap, String csvFilePath) {
+        String outputFilePath = ".csv";
+        try {
+            if (line.trim().startsWith("INSERT")) {
+                String tableName = line.substring(13, line.indexOf("` VALUES"));
+                if (null == fileWriterMap.get(tableName)) {
+                    fileWriterMap.put(tableName, new FileWriter(csvFilePath + tableName + outputFilePath));
+                }
+                // 解析SQL语句,提取数据
+                String[] data = parseSQLLine(line);
+
+                if (data != null) {
+                    // 将数据以CSV格式写入文件
+                    fileWriterMap.get(tableName).write(String.join(",", data));
+                    fileWriterMap.get(tableName).write("\n");
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    // 解析SQL语句中的数据
+    private static String[] parseSQLLine(String line) {
+        if (line.trim().startsWith("INSERT")) {
+            //解析INSERT语句中的数据
+            int startIndex = line.indexOf("(");
+            int endIndex = line.indexOf(")");
+            if (startIndex == -1 || endIndex == -1) {
+                return null;
+            }
+            String data = line.substring(startIndex + 1, endIndex).replaceAll("'", "");
+
+            return data.replace(", ", ",").replace("\\", "").split(",");
+        }
+        return null;
+
+    }
+
+
+    //创建数据库
+    public static boolean createDatabase(String tableName, Connection conn) {
+        try {
+            Statement statement = conn.createStatement();
+            statement.execute("CREATE DATABASE `" + tableName + "` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci");
+
+            log.info(String.format("Database '%s' created successfully!", tableName));
+            return true;
+        } catch (Exception e) {
+            log.info(String.format("Database '%s' created error!", tableName));
+        }
+        return false;
+    }
+
+    //创建各个表
+    private static boolean createTable(String databasesName, List<String> sqlStatements, Connection conn) {
+        try {
+            conn.setAutoCommit(false);
+            Statement statement = conn.createStatement();
+            sqlStatements.forEach(sql -> {
+                sql = sql.replace("EXISTS `", "EXISTS `" + databasesName + "`.`").replace(";", "");
+                sql = sql.replace("TABLE `", "TABLE `" + databasesName + "`.`").replace(";", "");
+                try {
+                    statement.addBatch(sql);
+
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            int[] i = statement.executeBatch();
+            conn.commit();
+            log.info("成功执行语句:" + i.length);
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+}

+ 66 - 0
src/main/java/com/example/bigsql/service/SQLtoCSVConverter.java

@@ -0,0 +1,66 @@
+package com.example.bigsql.service;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SQLtoCSVConverter {
+
+    public static void main(String[] args) {
+
+        String inputFilePath = "E:\\迅雷下载\\ipfcst-vv3.sql";
+        String outputFilePath = ".csv";
+
+        try {
+
+            BufferedReader reader = new BufferedReader(new FileReader(inputFilePath));
+            Map<String, FileWriter> fileWriterMap = new HashMap<>();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                if (line.trim().startsWith("INSERT")) {
+                    String tableName = line.substring(13, line.indexOf("` VALUES"));
+                    if (null == fileWriterMap.get(tableName)) {
+                        fileWriterMap.put(tableName, new FileWriter("E:\\迅雷下载\\csvdir\\" + tableName + outputFilePath));
+                    }
+                    // 解析SQL语句,提取数据
+                    String[] data = parseSQLLine(line);
+                    String[] data1 = data;
+                    for (int i = 0; i < data.length; i++) {
+                        if (data[i].equals("null")) {
+                            data1[i] = null;
+                        } else {
+                            data1[i] = data[i];
+                        }
+                    }
+                    if (data != null) {
+                        // 将数据以CSV格式写入文件
+                        fileWriterMap.get(tableName).write(String.join(",", data1));
+                        fileWriterMap.get(tableName).write("\n");
+                    }
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    // 解析SQL语句中的数据
+    private static String[] parseSQLLine(String line) {
+        if (line.trim().startsWith("INSERT")) {
+            //解析INSERT语句中的数据
+            int startIndex = line.indexOf("(");
+            int endIndex = line.indexOf(")");
+            if (startIndex == -1 || endIndex == -1) {
+                return null;
+            }
+            String data = line.substring(startIndex + 1, endIndex).replaceAll("'", "");
+
+            return data.replace(", ", ",").replace("\\", "").split(",");
+        }
+        return null;
+
+    }
+}

+ 118 - 0
src/main/java/com/example/bigsql/util/ExecuteShellUtil.java

@@ -0,0 +1,118 @@
+package com.example.bigsql.util;
+
+import com.jcraft.jsch.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+/**
+ * 执行Shell工具类
+ *
+ * @author JustryDeng
+ * @date 2019/4/29 16:29
+ */
+public class ExecuteShellUtil {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteShellUtil.class);
+    /**
+     * 未调用初始化方法 错误提示信息
+     */
+    private static final String DONOT_INIT_ERROR_MSG = "please invoke init(...) first!";
+    private Session session;
+    private Channel channel;
+    private ChannelExec channelExec;
+
+    private ExecuteShellUtil() {
+    }
+
+    public static void main(String[] args) throws Exception {
+        ExecuteShellUtil executeShellUtil = ExecuteShellUtil.getInstance();
+        executeShellUtil.init("192.168.1.48", 22, "administrator", "Jydl*3377");
+        String result = executeShellUtil.execCmd("mysqlsh mysql://root:123456@127.0.0.1:3306 -- util import-table D:\\bigsql\\csvdir\\J00307\\t_wind_tower_status_data.csv --schema=ipfcst_J00307_20240315124741 --table=t_wind_tower_status_data --dialect=csv-unix --threads=10");
+        System.out.println(result);
+        executeShellUtil.close();
+    }
+
+    /**
+     * 获取ExecuteShellUtil类实例对象
+     *
+     * @return 实例
+     * @date 2019/4/29 16:58
+     */
+    public static ExecuteShellUtil getInstance() {
+        return new ExecuteShellUtil();
+    }
+
+    /**
+     * 初始化
+     *
+     * @param ip       远程Linux地址
+     * @param port     端口
+     * @param username 用户名
+     * @param password 密码
+     * @throws JSchException JSch异常
+     * @date 2019/3/15 12:41
+     */
+    public void init(String ip, Integer port, String username, String password) throws JSchException {
+        JSch jsch = new JSch();
+        jsch.getSession(username, ip, port);
+        session = jsch.getSession(username, ip, port);
+        session.setPassword(password);
+        Properties sshConfig = new Properties();
+        sshConfig.put("StrictHostKeyChecking", "no");
+        session.setConfig(sshConfig);
+        session.connect(60 * 1000);
+        LOGGER.info("Session connected!");
+        // 打开执行shell指令的通道
+        channel = session.openChannel("exec");
+        channelExec = (ChannelExec) channel;
+    }
+
+    /**
+     * 执行一条命令
+     */
+    public String execCmd(String command) throws Exception {
+        if (session == null || channel == null || channelExec == null) {
+            throw new Exception(DONOT_INIT_ERROR_MSG);
+        }
+        LOGGER.info("execCmd command - > {}", command);
+        channelExec.setCommand(command);
+        channel.setInputStream(null);
+        channelExec.setErrStream(System.err);
+        channel.connect();
+        StringBuilder sb = new StringBuilder(16);
+        try (InputStream in = channelExec.getInputStream();
+             InputStreamReader isr = new InputStreamReader(in, StandardCharsets.UTF_8);
+             BufferedReader reader = new BufferedReader(isr)) {
+            String buffer;
+            while ((buffer = reader.readLine()) != null) {
+                sb.append("\n").append(buffer);
+            }
+            LOGGER.info("execCmd result - > {}", sb);
+            return sb.toString();
+        }
+    }
+
+    /**
+     * 释放资源
+     *
+     * @date 2019/3/15 12:47
+     */
+    public void close() {
+        if (channelExec != null && channelExec.isConnected()) {
+            channelExec.disconnect();
+        }
+        if (channel != null && channel.isConnected()) {
+            channel.disconnect();
+        }
+        if (session != null && session.isConnected()) {
+            session.disconnect();
+        }
+    }
+
+}

+ 59 - 0
src/main/java/com/example/bigsql/util/JyDbUtil.java

@@ -0,0 +1,59 @@
+package com.example.bigsql.util;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/**
+ * 读取sql文件导入数据
+ */
+public class JyDbUtil {
+    // JDBC 驱动名及数据库URL
+    static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+    static final String DB_URL = "jdbc:mysql://192.168.12.10:19306/sys?allowLoadLocalInfile=true&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&autoReconnect=true&rewriteBatchedStatements=true&serverTimezone=UTC&allowLoadInfile=true";
+    static final String DB_URL_172 = "jdbc:mysql://192.168.10.172:3306/sys?allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&autoReconnect=true&rewriteBatchedStatements=true&serverTimezone=UTC";
+
+    // 数据库的用户名与密码,需要根据自己的设置
+    static final String USER = "root";
+    static final String USER_172 = "root";
+    //static final String PASS = "LbHg0ABh20a5nBnm";
+    static final String PASS = "mysql_T7yN3E";
+    static final String PASS_172 = "!QAZ2root";
+    public static Connection conn = null;
+    public static Connection conn_172 = null;
+
+    static {
+        conn();
+        //conn172();
+    }
+
+    public static void conn() {
+        try {
+            Class.forName(JDBC_DRIVER);
+            conn = DriverManager.getConnection(DB_URL, USER, PASS);
+        } catch (Exception e) {
+            try {
+                conn.rollback();
+            } catch (SQLException ex) {
+                ex.printStackTrace();
+            }
+            e.printStackTrace();
+        }
+    }
+
+    public static void conn172() {
+        try {
+            Class.forName(JDBC_DRIVER);
+            conn_172 = DriverManager.getConnection(DB_URL_172, USER_172, PASS_172);
+        } catch (Exception e) {
+            try {
+                conn_172.rollback();
+            } catch (SQLException ex) {
+                ex.printStackTrace();
+            }
+            e.printStackTrace();
+        }
+    }
+
+
+}

+ 246 - 0
src/main/java/com/example/bigsql/util/UnZipUtil.java

@@ -0,0 +1,246 @@
+package com.example.bigsql.util;
+
+import com.github.junrar.Archive;
+import com.github.junrar.rarfile.FileHeader;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Enumeration;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
+
+public class UnZipUtil {
+    //解压.zip文件
+    public static void unZip(String sourceFile, String outputDir) throws IOException {
+        ZipFile zipFile = null;
+        File file = new File(sourceFile);
+        try {
+            Charset CP866 = StandardCharsets.UTF_8;  //specifying alternative (non UTF-8) charset
+            zipFile = new ZipFile(file, Charset.forName("GBK"));
+            createDirectory(outputDir, null);//创建输出目录
+
+            Enumeration<?> enums = zipFile.entries();
+            while (enums.hasMoreElements()) {
+
+                ZipEntry entry = (ZipEntry) enums.nextElement();
+                System.out.println("解压." + entry.getName());
+
+                if (entry.isDirectory()) {//是目录
+                    createDirectory(outputDir, entry.getName());//创建空目录
+                } else {//是文件
+                    File tmpFile = new File(outputDir + "/" + entry.getName());
+                    createDirectory(tmpFile.getParent() + "/", null);//创建输出目录
+
+                    try (InputStream in = zipFile.getInputStream(entry); OutputStream out = Files.newOutputStream(tmpFile.toPath())) {
+                        int length = 0;
+
+                        byte[] b = new byte[2048];
+                        while ((length = in.read(b)) != -1) {
+                            out.write(b, 0, length);
+                        }
+
+                    }
+                }
+            }
+
+        } catch (IOException e) {
+            throw new IOException("解压缩文件出现异常", e);
+        } finally {
+            try {
+                if (zipFile != null) {
+                    zipFile.close();
+                }
+            } catch (IOException ex) {
+                throw new IOException("关闭zipFile出现异常", ex);
+            }
+        }
+    }
+
+    public static void unzip1(File file, String outputDir) throws IOException {
+        File outFile = null;   // 输出文件的时候要有文件夹的操作
+        ZipFile zipFile = new ZipFile(file.getPath());   // 实例化ZipFile对象
+        ZipInputStream zipInput = null;    // 定义压缩输入流
+
+        //定义解压的文件名
+        OutputStream out = null;   // 定义输出流,用于输出每一个实体内容
+        InputStream input = null;  // 定义输入流,读取每一个ZipEntry
+        ZipEntry entry = null; // 每一个压缩实体
+        zipInput = new ZipInputStream(new FileInputStream(file));  // 实例化ZIpInputStream
+
+        //遍历压缩包中的文件
+        while ((entry = zipInput.getNextEntry()) != null) { // 得到一个压缩实体
+            System.out.println("解压缩" + entry.getName() + "文件");
+            outFile = new File(outputDir + File.separator + entry.getName());   // 定义输出的文件路径
+            if (!outFile.getParentFile().exists()) {  // 如果输出文件夹不存在
+                outFile.getParentFile().mkdirs();
+                // 创建文件夹 ,如果这里的有多级文件夹不存在,请使用mkdirs()
+                // 如果只是单纯的一级文件夹,使用mkdir()就好了
+            }
+            if (!outFile.exists()) {  // 判断输出文件是否存在
+                if (entry.isDirectory()) {
+                    outFile.mkdirs();
+                    System.out.println("create directory...");
+                } else {
+                    outFile.createNewFile();   // 创建文件
+                    System.out.println("create file...");
+                }
+            }
+            if (!entry.isDirectory()) {
+                input = zipFile.getInputStream(entry); // 得到每一个实体的输入流
+                out = new FileOutputStream(outFile);   // 实例化文件输出流
+                int temp = 0;
+                while ((temp = input.read()) != -1) {
+                    out.write(temp);
+                }
+                input.close();     // 关闭输入流
+                out.close();   // 关闭输出流
+            }
+
+        }
+        input.close();
+    }
+
+    /**
+     * 构建目录
+     *
+     * @param outputDir
+     * @param subDir
+     */
+    public static void createDirectory(String outputDir, String subDir) {
+        File file = new File(outputDir);
+        if (!(subDir == null || subDir.trim().equals(""))) {//子目录不为空
+            file = new File(outputDir + "/" + subDir);
+        }
+        if (!file.exists()) {
+            if (!file.getParentFile().exists())
+                file.getParentFile().mkdirs();
+            file.mkdirs();
+        }
+    }
+
+
+    //解压.rar文件
+    public static void unRar(String sourceFile, String outputDir) throws Exception {
+        Archive archive = null;
+        FileOutputStream fos = null;
+        File file = new File(sourceFile);
+        try {
+            archive = new Archive(file);
+            FileHeader fh = archive.nextFileHeader();
+            int count = 0;
+            File destFileName = null;
+            while (fh != null) {
+                System.out.println((++count) + ") " + fh.getFileNameString());
+                String compressFileName = fh.getFileNameString().trim();
+                destFileName = new File(outputDir + "/" + compressFileName);
+                if (fh.isDirectory()) {
+                    if (!destFileName.exists()) {
+                        destFileName.mkdirs();
+                    }
+                    fh = archive.nextFileHeader();
+                    continue;
+                }
+                if (!destFileName.getParentFile().exists()) {
+                    destFileName.getParentFile().mkdirs();
+                }
+                fos = new FileOutputStream(destFileName);
+                archive.extractFile(fh, fos);
+                fos.close();
+                fos = null;
+                fh = archive.nextFileHeader();
+            }
+
+            archive.close();
+            archive = null;
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            if (fos != null) {
+                try {
+                    fos.close();
+                    fos = null;
+                } catch (Exception e) {
+                    //ignore
+                }
+            }
+            if (archive != null) {
+                try {
+                    archive.close();
+                    archive = null;
+                } catch (Exception e) {
+                    //ignore
+                }
+            }
+        }
+    }
+
+
+    //解压.gz文件
+    public static void unGz(String sourceFile, String outputDir) {
+        String ouputfile = "";
+        try {
+            //建立gzip压缩文件输入流
+            FileInputStream fin = new FileInputStream(sourceFile);
+            //建立gzip解压工作流
+            GZIPInputStream gzin = new GZIPInputStream(fin);
+            //建立解压文件输出流
+            /*ouputfile = sourceFile.substring(0,sourceFile.lastIndexOf('.'));
+            ouputfile = ouputfile.substring(0,ouputfile.lastIndexOf('.'));*/
+            File file = new File(sourceFile);
+            String fileName = file.getName();
+            outputDir = outputDir + "/" + fileName.substring(0, fileName.lastIndexOf('.'));
+            FileOutputStream fout = new FileOutputStream(outputDir);
+
+            int num;
+            byte[] buf = new byte[1024];
+
+            while ((num = gzin.read(buf, 0, buf.length)) != -1) {
+                fout.write(buf, 0, num);
+            }
+
+            gzin.close();
+            fout.close();
+            fin.close();
+        } catch (Exception ex) {
+            System.err.println(ex);
+        }
+    }
+
+    /*public static void main(String[] args) throws IOException {
+        ZipFile zipFile = new ZipFile("/Users/xiaowang/Downloads/J00307.zip");
+    }*/
+    public static void main(String[] args) throws IOException, SQLException {
+        //连接mysql
+        Connection conn = null;
+        PreparedStatement ps = null;
+
+        try {
+            String url = "jdbc:mysql://192.168.1.119:14000/jiayueCloud_local?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true";
+            //加载数据库驱动
+            Class.forName("com.mysql.cj.jdbc.Driver");
+            conn = DriverManager.getConnection(url, "root", "M*R7Dp+H4ey2Y^319@");
+            //String sql = "UPDATE dc_upload_bigsql_file_record set status = '" + status + "' , db_name = '" + dbName + "' WHERE id = " + id + ";";
+            String sql = "UPDATE dc_upload_bigsql_file_record set status = '" + "1" + "' , db_name = '" + "ipfcst_J00307_20202" + "' WHERE id = " + 30001 + ";";
+            ps = conn.prepareStatement(sql);
+            ps.executeUpdate();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (ps != null) {
+                ps.close();
+            }
+            if (conn != null) {
+                conn.close();
+            }
+        }
+    }
+
+}

+ 0 - 13
src/main/java/com/example/datadump/DataDumpApplication.java

@@ -1,13 +0,0 @@
-package com.example.datadump;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-@SpringBootApplication
-public class DataDumpApplication {
-
-    public static void main(String[] args) {
-        SpringApplication.run(DataDumpApplication.class, args);
-    }
-
-}

+ 108 - 0
src/main/java/com/example/datadump/service/TDEngineService1.java

@@ -0,0 +1,108 @@
+package com.example.datadump.service;
+
+import com.example.datadump.util.DateTimeUtil;
+import com.taosdata.jdbc.TSDBDriver;
+
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.*;
+
+public class TDEngineService1 {
+
+    public static void main(String[] args) throws Exception {
+        //mysql
+        Connection mysqlConn = null;
+        ResultSet rs = null;
+        String SQL = null;
+        String url = "jdbc:mysql://" + "192.168.1.119" + ":" + "14000" + "/" + "jiayueCloud" + "?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true";
+        //加载数据库驱动
+        Class.forName("com.mysql.cj.jdbc.Driver");
+        //创建连接
+        mysqlConn = DriverManager.getConnection(url, "root", "M*R7Dp+H4ey2Y^319@");
+        Statement statement = mysqlConn.createStatement();
+        SQL = "SELECT \r\n" +
+                "   *\r\n" +
+                "FROM  stormbringer_wind_tower_info \r\n" +
+                " where equipment_type = '1'";
+        rs = statement.executeQuery(SQL);
+        //所有声雷达
+        List<String> eqList = new ArrayList<>();
+        while (rs.next()) {
+            eqList.add(rs.getString("equipment_no"));
+        }
+        eqList = eqList.subList(64, eqList.size());
+        mysqlConn.close();
+        rs.close();
+
+        //tdengine
+        String jdbcUrl = "jdbc:TAOS://" + "192.168.1.114" + ":" + "6030" + "/" + "jiayueCloud" + "?user=" + "root" + "&password=" + "123456";
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
+        Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
+        Statement ps;
+        ps = conn.createStatement();
+        ResultSet tdengineRS = null;
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+        //开始处理所有声雷达的所有表里的数据
+        List<String> heightList = new ArrayList<>();
+        for (int i = 10; i <= 200; i = i + 10) {
+            heightList.add(String.valueOf(i));
+        }
+        for (String eq : eqList) {
+            for (String height : heightList) {
+                String firstSQL = "select first(ts) from anemometry_" + eq + "_" + height;
+                String lastSQL = "select last(ts) from anemometry_" + eq + "_" + height;
+                tdengineRS = ps.executeQuery(firstSQL);
+                List<Map<String, String>> firstList = convertList(tdengineRS);
+                //获取第一条数据的开始时间
+                Date firstDate = format.parse(firstList.get(0).get("first(ts)"));
+                tdengineRS = ps.executeQuery(lastSQL);
+                List<Map<String, String>> endList = convertList(tdengineRS);
+                //获取最后一条数据的结束时间
+                Date lastDate = format.parse(endList.get(0).get("last(ts)"));
+                for (long i = firstDate.getTime(); i < lastDate.getTime(); i += 1000 * 60 * 60 * 24L) {
+                    Date date = DateTimeUtil.getDayLastTime1(i);
+                    //获取23点59的数据
+                    String sql = "select * from anemometry_" + eq + "_" + height + " where ts = '" + format1.format(date) + "'";
+                    tdengineRS = ps.executeQuery(sql);
+                    List<Map<String, String>> a = convertList(tdengineRS);
+                    if (!a.isEmpty()) {
+                        Date date1 = format1.parse(a.get(0).get("ts"));
+                        String time1 = format1.format(new Date(date1.getTime() + 1000 * 60L));
+                        //把23点59分的数据插入到第二天0点0分
+                        String sql1 = "INSERT INTO anemometry_" + eq + "_" + height + " USING prophase_anemometry_data TAGS(" + eq + "," + height + ") VALUES('" + time1 + "'," + a.get(0).get("ws_inst") + "," + a.get(0).get("ws_max") + "," + a.get(0).get("ws_min") + "," + a.get(0).get("ws_ave") + "," + a.get(0).get("ws_gust") + "," + a.get(0).get("ws_sta") + "," + a.get(0).get("wd_inst") + "," + a.get(0).get("wd_max") + "," + a.get(0).get("wd_min") + "," + a.get(0).get("wd_ave") + "," + a.get(0).get("wd_sta") + "," + a.get(0).get("data_flag") + ")";
+                        int update = ps.executeUpdate(sql1);
+                        System.out.println("anemometry_" + eq + "_" + height + "表, 插入" + update + "条数据, 数据时间为: " + time1);
+                    }
+                }
+                System.out.println("anemometry_" + eq + "_" + height + "表,结束");
+            }
+        }
+    }
+
+    private static List<Map<String, String>> convertList(ResultSet rs) throws Exception {
+        List<Map<String, String>> list = new ArrayList<>();
+        ResultSetMetaData md = rs.getMetaData();//获取键名
+        int columnCount = md.getColumnCount();//获取行的数量
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+        while (rs.next()) {
+            Map<String, String> rowData = new LinkedHashMap<>();//声明Map
+            for (int i = 1; i <= columnCount; i++) {
+                if ("ts".equals(md.getColumnName(i))) {
+                    //把2021-01-01 00:10:00.0 转换成 2021-01-01T08:00:00.000+08:00'
+                    Date date = format.parse(rs.getString(i));
+                    rowData.put(md.getColumnName(i), format1.format(date));
+                } else {
+                    rowData.put(md.getColumnName(i), rs.getString(i) == null ? "NULL" : rs.getString(i));//获取键名及值
+                }
+            }
+            list.add(rowData);
+        }
+        return list;
+    }
+}

+ 15 - 0
src/main/java/com/example/datadump/util/DateTimeUtil.java

@@ -140,6 +140,21 @@ public class DateTimeUtil {
     }
 
     /**
+     * 获取指定时间所在天的23点59分59秒
+     *
+     * @param dateTime 时间毫秒
+     * @return 23点59分59秒的毫秒
+     */
+    public static Date getDayLastTime1(@NotNull final Long dateTime) {
+        Calendar date = Calendar.getInstance();
+        date.setTimeInMillis(dateTime);
+        date.set(Calendar.HOUR_OF_DAY, 23);
+        date.set(Calendar.MINUTE, 59);
+        date.set(Calendar.SECOND, 00);
+        return date.getTime();
+    }
+
+    /**
      * 获取两个时间间的间隔天数(自然天)
      *
      * @param dateFrom 开始时间

+ 64 - 0
src/main/resources/logback.xml

@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 这个是根配置文件,一定要有的
+    scan:
+        是当配置文件被修改后会被重新加载
+    scanPeriod:
+        设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,
+        默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。
+    debug:
+        当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。
+        默认值为false。
+     -->
+<configuration>
+    <!-- 日志存放路径
+        下面的标签可以自己定义
+        name:相当于Map的key
+        value:就是map的value
+        ${catalina.base}是tomcat的当前路径
+        /logs:就是tomcat下的日志路径,
+        /ehrlog:如果没有目录会默认创建
+    -->
+    <property name="LOG_HOME" value="./logs/"/>
+    <!-- appender:
+        name相当于一个名称
+        class:确定要加载哪个类
+        encoder:一定要加 encoder,
+        默认配置为PatternLayoutEncoder
+        patter:必填
+        ConsoleAppender:输出在控制台上
+    -->
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度,%msg:日志消息,%n是换行符-->
+            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %highlight(%-5level) %cyan(%logger{50}) - %highlight(%msg%n)
+            </pattern>
+            <charset>UTF-8</charset>
+        </encoder>
+    </appender>
+    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <!-- 当前日志文件 -->
+        <file>${LOG_HOME}dataDump.log</file>
+        <!-- 编码 -->
+        <!--<Encoding>UTF-8</Encoding>-->
+        <!-- 按照时间来 -->
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <!--日志文件输出的文件名-->
+            <FileNamePattern>${LOG_HOME}dataDump.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern>
+            <!--日志文件保留天数-->
+            <MaxHistory>180</MaxHistory>
+            <maxFileSize>10MB</maxFileSize>
+            <totalSizeCap>1024MB</totalSizeCap>
+            <cleanHistoryOnStart>true</cleanHistoryOnStart>
+        </rollingPolicy>
+        <!-- 布局 -->
+        <encoder>
+            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度,%msg:日志消息,%n是换行符-->
+            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+        </encoder>
+        <append>false</append>
+    </appender>
+    <root level="info">
+        <appender-ref ref="STDOUT"/>
+        <appender-ref ref="FILE"/>
+    </root>
+</configuration>

+ 57 - 6
src/main/test/jiayue/service/TestDump.java

@@ -9,11 +9,62 @@ public class TestDump {
 
     @Test
     public void importTDEngine() throws Exception {
-        String s = "2020-12-31 00:10:00.0";
-        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
-        Date a = format.parse(s);
-        format1.format(a);
-        System.out.println(1);
+        /*
+                    log.info("开始解析sql文件,文件名:" + f.getName());
+                //场站编号
+                String name = f.getName().substring(0, 6);
+                //String csvFilePath = "D:"+File.separator+"bigsql"+File.separator+"csvdir"+File.separator + name + File.separator;
+                String csvFilePath = "/Users/xiaowang/个人/csvdir/" + name + File.separator;
+                //判断csvFilePath这个目录是否存在 如果不存在则创建目录
+                if (!new File(csvFilePath).exists()) {
+                    new File(csvFilePath).mkdirs();
+                }
+                String databasesName = "";
+                //获取文件输入流
+                BufferedInputStream input = new BufferedInputStream(Files.newInputStream(Paths.get(f.getPath())));
+                //获取ZIP输入流(一定要指定字符集Charset.forName("GBK")否则会报java.lang.IllegalArgumentException: MALFORMED)
+                ZipInputStream zipInputStream = new ZipInputStream(input, Charset.forName("GBK"));
+                //定义ZipEntry置为null,避免由于重复调用zipInputStream.getNextEntry造成的不必要的问题
+                ZipEntry ze = null;
+                //扫描解压目录下的sql文件 正常没有数据或只有一条数据
+                    while ((ze = zipInputStream.getNextEntry()) != null) {
+                        try (BufferedReader reader = new BufferedReader(new InputStreamReader(zipInputStream, StandardCharsets.UTF_8))) {
+                            List<String> sqlStatements = new ArrayList<>();
+                            StringBuilder statementBuilder = new StringBuilder();
+                            String line;
+                            int lineSize = 0;
+                            while ((line = reader.readLine()) != null) {
+                                if (line.startsWith("Navicat") || line.startsWith("/")
+                                        || line.startsWith("INSERT") || line.startsWith("--") || line.startsWith("#")
+                                        || line.startsWith("//")) {
+                                    lineSize++;
+                                    //sql内的数据转换成csv文件
+                                    spliteFileToCsv(line, fileWriterMap, csvFilePath);
+                                    continue; // Skip comments
+                                }
+                                if (line.endsWith(";")) {
+                                    statementBuilder.append(line); // Remove semicolon
+                                    sqlStatements.add(statementBuilder.toString().trim()); // Add to list
+                                    statementBuilder.setLength(0); // Reset statement buffer
+                                } else {
+                                    statementBuilder.append(line).append(" ");
+                                }
+                            }
+                            reader.close();
+                            sqlStatements = sqlStatements.stream().filter(s -> s.startsWith("CREATE") || s.startsWith("DROP")).collect(Collectors.toList());
+                            System.out.println(sqlStatements);
+                            databasesName = "ipfcst_" + name + "_" + DateUtil.format(new Date(), DatePattern.PURE_DATETIME_FORMAT);
+                            Connection conn = JyDbUtil.conn;
+                            if (createDatabase(databasesName, conn)) {
+                                if (createTable(databasesName, sqlStatements, conn)) {
+                                    log.info("执行数据库和数据表创建成功,其中数据共:" + lineSize);
+                                }
+                            }
+                        } catch (IOException e) {
+                            updateRecords(uploadBigsqlFileRecordBaseVO.getId(), "2", databasesName);
+                            throw new RuntimeException("读取Sql文件失败:  " + f.getName(), e);
+                        }
+                        zipInputStream.close();
+                    }*/
     }
 }

+ 11 - 1
src/test/java/com/example/datadump/DataDumpApplicationTests.java

@@ -1,13 +1,23 @@
 package com.example.datadump;
 
+import cn.hutool.core.io.FileUtil;
+import com.example.bigsql.service.CreateDatabasesByV3File;
 import org.junit.jupiter.api.Test;
 import org.springframework.boot.test.context.SpringBootTest;
 
+import javax.annotation.Resource;
+
 @SpringBootTest
 class DataDumpApplicationTests {
+    @Resource
+    private CreateDatabasesByV3File createDatabasesByV3File;
 
     @Test
-    void contextLoads() {
+    void contextLoads() throws Exception {
+        String csvFilePath = "/Users/xiaowang/个人/csvdir/";
+        String sqlPath = "/Users/xiaowang/个人/sqldir/";
+        FileUtil.del(sqlPath);
+        FileUtil.del(csvFilePath);
     }
 
 }