David 1 ماه پیش
والد
کامیت
e4c42b3fda
3فایلهای تغییر یافته به همراه1035 افزوده شده و 0 حذف شده
  1. 534 0
      DataBase/turbine-cluster/data_analysis.py
  2. 84 0
      DataBase/turbine-cluster/data_cleaning.py
  3. 417 0
      DataBase/turbine-cluster/inputData.py

+ 534 - 0
DataBase/turbine-cluster/data_analysis.py

@@ -0,0 +1,534 @@
+# !usr/bin/env python
+# -*- coding:utf-8 _*-
+"""
+@Author:Lijiaxing
+ 
+@File:data_analysis.py
+@Time:2023/4/24 15:16
+
+"""
+import os.path
+import matplotlib.colors as mcolors
+colors = list(mcolors.XKCD_COLORS.keys())
+import pandas as pd
+# from mpl_toolkits.basemap import Basemap
+from scipy.signal import savgol_filter
+import numpy as np
+import matplotlib.pyplot as plt
+from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
+from sklearn.metrics import silhouette_samples, silhouette_score
+
+def paint_others(y):
+    """ 绘制其他数据 """
+    plt.plot([j for j in range(y)], y)
+    # 添加标题和标签
+    plt.xlabel('x')
+    plt.ylabel('y')
+
+    # 显示图形
+    plt.show()
+
+
+def compute_cos_similarity(a, b):
+    """
+    计算两个向量的余弦相似度
+    :param a: 向量a
+    :param b: 向量b
+    :return: 余弦相似度值
+    """
+    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
+
+
+def compute_pearsonr(a):
+    """
+    计算数据皮尔逊相关系数并返回相似度矩阵
+    :param a: 数据格式为n*m的矩阵,n为数据个数,m为数据维度
+    :return: 返回相似度矩阵,数据格式为n*n的矩阵
+    """
+    return np.corrcoef(a)
+
+
+def compute_distance(a, b):
+    """
+    计算两个向量的欧式距离
+    :param a:
+    :param b:
+    :return: 返回两个向量的欧式距离
+    """
+    return np.linalg.norm(a - b)
+
+
+def hierarchical_clustering(data, threshold, similarity_func):
+    """
+    层次聚类,使用工具包scipy.cluster.hierarchy中的linkage和fcluster函数进行层次聚类
+    :param data: 二维数据,格式为n*m的矩阵,n为数据个数,m为数据维度
+    :param threshold: 阈值,当两个数据的距离小于阈值时,将两个数据归为一类,阈值为根据相似度矩阵层次聚类后的类别距离阈值,可根据需求进行调整,可大于1
+    :param similarity_func: 相似度计算函数,用于计算两个数据的相似度,可以进行替换,若替换为计算距离的函数需对内部进行修改
+    :return: 返回聚类结果,格式为n*1的矩阵,n为数据个数,每个数据的值为该数据所属的类别
+    """
+    # 计算数据的相似度矩阵
+    similarity_matrix = similarity_func(data)
+
+    # 计算数据的距离矩阵
+    distance_matrix = 1 - similarity_matrix
+
+    # 进行层次聚类返回聚类结果
+    Z = linkage(distance_matrix, method='ward')
+    # 根据相似度阈值获取聚类结果
+    clusters = fcluster(Z, t=threshold, criterion='distance')
+    print(clusters)
+    # 画出层次聚类树形结构
+    # fig = plt.figure(figsize=(5, 3))
+    # dn = dendrogram(Z)
+    # plt.show()
+
+    # clusters[42] = 1
+    silhouette = silhouette_samples(np.abs(distance_matrix), clusters, metric='euclidean')
+    silhouette1 = silhouette_score(np.abs(distance_matrix), clusters, metric='euclidean')
+    print(f"平均轮廓系数为:{silhouette1}, 单个样本的轮廓系数:{silhouette}")
+    return clusters
+
+
+class DataAnalysis:
+    """
+    数据分析类
+    """
+
+    def __init__(self, data_length, data_start, data_end):
+        """
+        初始化
+        :param data_length: 分析数据段长度
+        :param data_start: 分析数据段开始位置
+        :param data_end: 分析数据段结束位置
+        """
+        # 原始风机功率数据傅里叶变换滤波后的数据
+        self.ori_turbine_power = None
+        self.ori_turbine_fft = None
+        # 原始风机功率数据片段
+        self.ori_turbine_pic = None
+        # 聚类结果
+        self.cluster = None
+        # 风机功率差分平滑后的结果
+        self.smooth_turbine_diff = None
+        # 风机功率差分变化情况
+        self.diff_change = None
+        # 风机功率差分
+        self.turbine_diff = None
+        # 全部风机数据
+        self.turbine = None
+        # 风机的标号顺序
+        self.turbine_id = None
+        # b1b4 = [142, 143, 144, 145]
+        # self.turbine_id = [id for id in self.turbine_id if id not in b1b4]
+        # 风机功率数据15分钟级别
+        self.power_15min = None
+        # 风机经纬度信息
+        self.info = None
+        # 使用数据长度
+        self.data_length = data_length
+        # 使用数据开始位置
+        self.data_start = data_start
+        # 使用数据结束位置
+        self.data_end = data_end
+        # 导入数据
+        self.load_data(normalize=True)
+        # 计算风机功率差分
+        self.compute_turbine_diff()
+
+    def load_data(self, normalize=False):
+        """
+        加载数据
+        :return:
+        """
+        import pickle
+        with open('./turbine_dict_common.pickle', 'rb') as f:
+            turbine_dict = pickle.load(f)
+        self.turbine_id = list(turbine_dict.keys())
+        # self.info = pd.read_csv('../data-process/data/风机信息.csv', encoding='utf-8')
+        # power_15min = pd.read_csv('../data/power_15min.csv')
+        # for i in range(len(power_15min)):
+        #     if power_15min.loc[i, 'C_REAL_VALUE'] == -9999:
+        #         # 方便在曲线中看出缺失数据位置
+        #         power_15min.loc[i, 'C_REAL_VALUE'] = -34.56789
+        # self.power_15min = power_15min
+        turbine_path = '../data-process/data/output_filtered_csv_files/turbine-{}.csv'
+        self.turbine, turbines = {}, []
+        for i in self.turbine_id:
+            group = turbine_dict[i]
+            group.rename(columns={"时间":"C_TIME", "平均风速":"C_WS", "平均有功功率":"C_REAL_VALUE"},  inplace=True)
+            group = group.loc[:, ['C_TIME', 'C_WS', 'C_REAL_VALUE']]
+            self.turbine[i] = group
+        # for i in self.turbine_id:
+        #     self.turbine[i] = pd.read_csv(turbine_path.format(i))[20:].reset_index(drop=True)
+        if normalize is True:
+            self.normalize()
+
+    def normalize(self):
+        turbines = [self.turbine[i].values[:, 1:].astype(np.float32) for i in self.turbine_id]
+        turbines = np.vstack(turbines)
+        mean, std = np.mean(turbines, axis=0), np.std(turbines, axis=0)
+        for i in self.turbine_id:
+            c_time = self.turbine[i]['C_TIME']
+            self.turbine[i] = (self.turbine[i].iloc[:, 1:] - mean) / std
+            self.turbine[i].insert(loc=0, column='C_TIME', value=c_time)
+        return self.turbine
+
+    def compute_turbine_diff(self):
+        """
+        计算风机功率差分
+        :return:
+        """
+        turbine_diff = []
+        ori_turbine_pic = []
+        ori_turbine_power = []
+        for turbine_i in self.turbine_id:
+            ori = np.array(self.turbine[turbine_i]['C_WS'].values[self.data_start:self.data_end + 1])
+            diff_array = np.diff(ori)
+            smoothness_value = np.std(diff_array)
+            print("turbine-{}的平滑度是:{}".format(turbine_i, round(smoothness_value, 2)))
+            turbine_diff.append(diff_array)
+            ori_turbine_pic.append(self.turbine[turbine_i]['C_WS'].values[self.data_start:self.data_end])
+            ori_turbine_power.append(self.turbine[turbine_i]['C_REAL_VALUE'].values[self.data_start:self.data_end])
+
+        self.ori_turbine_power = ori_turbine_power
+        self.ori_turbine_pic = ori_turbine_pic
+        self.turbine_diff = turbine_diff
+
+        diff_change = []
+        for diff_i in turbine_diff:
+            single_diff_change = []
+            for diff_i_i in diff_i:
+                if diff_i_i > 0:
+                    single_diff_change.append(1)
+                elif diff_i_i < 0:
+                    single_diff_change.append(-1)
+                else:
+                    single_diff_change.append(0)
+            diff_change.append(single_diff_change)
+        self.diff_change = diff_change
+        self.ori_turbine_fft = [self.turbine_fft(i + 1) for i in range(len(self.ori_turbine_pic))]
+
+        # 平滑
+        self.turbine_smooth(window_size=21)
+
+    def paint_map(self):
+        """
+        绘制经纬度地图
+        :return:
+        """
+        lats = self.info['纬度'].values
+        lons = self.info['经度'].values
+        map = Basemap()
+
+        # 绘制海岸线和国家边界
+        map.drawcoastlines()
+        map.drawcountries()
+
+        # 绘制经纬度坐标
+        map.drawmeridians(range(0, 360, 30))
+        map.drawparallels(range(-90, 90, 30))
+
+        # 绘制点
+
+        x, y = map(lons, lats)
+        map.plot(x, y, 'bo', markersize=10)
+
+        # 显示图表
+        plt.show()
+
+    def paint_power15min(self):
+        """
+        绘制15分钟功率曲线
+        :return:
+        """
+
+        plt.plot(self.power_15min['C_REAL_VALUE'])
+
+        # 设置图表标题和轴标签
+        plt.title('Data Time Change Curve')
+        plt.xlabel('Date')
+        plt.ylabel('Value')
+
+        # 显示图表
+        plt.show()
+
+    def paint_lats_lons(self):
+        """
+        绘制经纬度图
+        :return:
+        """
+        x = self.info['纬度'].values
+        y = self.info['经度'].values
+
+        # 绘制散点图
+        fig, ax = plt.subplots()
+        plt.scatter(x, y)
+
+        for i, txt in enumerate(self.info['id'].values):
+            ax.annotate(txt, (x[i], y[i]))
+
+        # 设置图表标题和轴标签
+        plt.xlabel('lats')
+        plt.ylabel('lons')
+
+        # 显示图表
+        plt.show()
+
+    def similarity_score(self, turbine_diff, threshold=0.5):
+        """
+        使用余弦相似度计算相似度分数并返回相似度大于阈值的index矩阵
+        :param turbine_diff: 需要计算相似的矩阵,数据格式n*m,n为数据条数,m为数据维数
+        :param threshold: 相似度阈值
+        :return: 返回相似计算后的矩阵
+        """
+        similarity = {i: [] for i in range(49)}
+        similarity_index = {i: [] for i in range(49)}
+        for turbine_i in range(49):
+            for turbine_j in range(49):
+                cos_similarity = compute_cos_similarity(turbine_diff[turbine_i], turbine_diff[turbine_j])
+                similarity[turbine_i].append(cos_similarity)
+                if cos_similarity > threshold:
+                    similarity_index[turbine_i].append(turbine_j)
+        return similarity_index
+
+    def mapping_turbines(self):
+        for a, b in zip(self.turbine_id, self.cluster):
+            print("风机编号:{},类别:{}".format(a, b))
+
+    def paint_turbine(self, paint_default=True):
+        """
+        绘制风机地理位置图
+        :param paint_default:默认True,绘制聚类后每个类别的数据折线图
+        :return: None
+        """
+
+        # y = self.info['纬度'].values
+        # x = self.info['经度'].values
+        #
+        # fig, ax = plt.subplots(figsize=(15, 15))
+        #
+        # plt.scatter(x, y, c=self.cluster)
+        # for i, txt in enumerate(self.info['C_ID'].values):
+        #     ax.annotate(txt, (x[i], y[i]))
+
+        # 设置图表标题和轴标签
+        # plt.xlabel('lons')
+        # plt.ylabel('lats')
+        # plt.legend()
+        #
+        # # 显示图表
+        # plt.savefig('analysis_img/turbine_cluster.png')
+        # plt.show()
+
+        plt.figure(figsize=(60, 40))
+        cmap = plt.get_cmap('viridis')
+        linestyle= ['solid', 'dashed', 'dotted', 'dashdot']
+        for i in range(max(self.cluster)):
+            cluster, cluster_fft, cluster_power, = [], [], []
+            for j, item in enumerate(self.cluster):
+                if item == i + 1:
+                    cluster.append(self.ori_turbine_pic[j])
+                    cluster_fft.append(self.ori_turbine_fft[j])
+                    cluster_power.append(self.ori_turbine_power[j])
+            cluster_power = np.average(cluster_power, axis=0)
+            cluster_fft = np.average(cluster_fft, axis=0)
+            cluster = np.average(cluster, axis=0)
+            diff_array = np.diff(cluster)
+            smoothness_value = np.std(diff_array)
+            print("聚类-{}的平滑度是:{}".format(i+1, smoothness_value))
+            color = cmap(i*200)
+            plt.figure(1)
+            # plt.subplot(max(self.cluster), 1, 1)
+            # print("----", cluster, linestyle[i])
+            # plt.plot([j for j in range(len(cluster))], cluster, color=color, label='cluster'+str(i))
+            # plt.subplot(max(self.cluster), 1, 2)
+            # plt.plot([j for j in range(len(cluster_fft))], cluster_fft, color=color, label='cluster'+str(i))
+
+            # ws_power_dict = {}
+            # for c, p in zip(cluster, cluster_power):
+            #     ws_power_dict.setdefault(round(c, 2), []).append(round(p, 2))
+            #
+            # for key, value in ws_power_dict.items():
+            #     ws_power_dict[key] = round(np.average(value), 2)
+            # print(ws_power_dict)
+            # plt.scatter(cluster, cluster_power, color=color, label='cluster' + str(i),
+            #          linestyle=linestyle[i], s=1, alpha=0.2)
+
+        # 添加图例
+        # plt.legend()
+        # # 显示图形
+        # plt.savefig('./clusters.png')
+        # plt.show()
+        # if paint_default:
+        #     for i in range(max(self.cluster)):
+        #         self.paint_turbine_k(i + 1)  # 画出聚类中每个风机的曲线
+
+
+
+    def turbine_smooth(self, window_size=50):
+        """
+        使用滑动平均平滑数据。
+
+        参数:
+        data -- 需要平滑的数据,numpy数组类型
+        window_size -- 滑动窗口大小,整数类型
+
+        返回值:
+        smooth_data -- 平滑后的数据,numpy数组类型
+        """
+
+        # weights = np.repeat(1.0, window_size) / window_size
+        smooth_data = []
+        for turbine_diff_i in self.turbine_diff:
+            smooth_y = savgol_filter(turbine_diff_i, window_length=window_size, polyorder=3)
+            smooth_data.append(smooth_y)
+        #     smooth_data.append(np.convolve(turbine_diff_i, weights, 'valid'))
+        self.smooth_turbine_diff = smooth_data
+
+    def paint_turbine_k(self, k):
+        """
+        绘制第k聚类的风机数据折线图
+        :param k:
+        :return:
+        """
+        pic_label = []
+        y = []
+        plt.figure(figsize=(20, 10))
+        cmap = plt.get_cmap('viridis')
+        for i, item in enumerate(self.cluster):
+            if item == k:
+                pic_label.append('turbine-' + str(self.turbine_id[i]))
+                y.append(self.ori_turbine_fft[i])
+        for i in range(len(y)):
+            color = cmap(i / 10)
+            plt.plot([j for j in range(len(y[i]))], y[i], color=color, label=pic_label[i])
+        # 添加标签和标题
+        plt.xlabel('x')
+        plt.ylabel('y')
+        plt.title('Cluster {}'.format(k))
+
+        # 添加图例
+        plt.legend()
+        # 显示图形
+        plt.savefig('analysis_img/cluster/cluster_{}.png'.format(k))
+        plt.show()
+
+    def turbine_fft(self, k):
+        """
+        对第k台原始风机数据进行傅里叶变换,并绘制变换前后曲线
+        :param k: 数据读入时的风机顺序index,从1开始
+        :return: 傅里叶变换清洗后的数据,数据格式
+        """
+        y = self.ori_turbine_pic
+        t = np.linspace(0, 1, self.data_length)
+        signal = y[k - 1]
+
+        # 进行傅里叶变换
+        freq = np.fft.fftfreq(len(signal), t[1] - t[0])
+        spectrum = np.fft.fft(signal)
+        spectrum_abs = np.abs(spectrum)
+        threshold = np.percentile(spectrum_abs, 98)
+        indices = spectrum_abs > threshold
+        spectrum_clean = indices * spectrum
+
+        # 进行傅里叶逆变换
+        signal_clean = np.fft.ifft(spectrum_clean)
+        # plt.figure(figsize=(20, 10))
+        #
+        # # 绘制时域信号
+        # plt.subplot(4, 1, 1)
+        # plt.plot(t, signal)
+        # plt.title(self.turbine_id[k-1])
+        #
+        # # 绘制频域信号
+        # plt.subplot(4, 1, 2)
+        # plt.plot(freq, np.abs(spectrum))
+        #
+        # # 绘制过滤后的频域信号
+        # plt.subplot(4, 1, 3)
+        # plt.plot(freq, np.abs(spectrum_clean))
+        #
+        # # 绘制过滤后的时域信号
+        # plt.subplot(4, 1, 4)
+        # plt.plot(t, signal_clean)
+        #
+        # plt.savefig('analysis_img/fft/{}_turbine_fft.png'.format(self.turbine_id[k-1]))
+        # plt.show()
+        return signal_clean
+
+    def paint_double(self, i, j):
+        """
+        绘制两台风机的数据变换对比
+        :param i: 风机数据读入时数据编号,从1开始
+        :param j: 风机数据读入时数据编号,从1开始
+        :return:
+        """
+        y = self.ori_turbine_fft
+        x = [index for index in range(self.data_length)]
+        data_i = y[i - 1]
+        data_j = y[j - 1]
+
+        plt.figure(figsize=(20, 10))
+        plt.plot(x, data_i, label='turbine {}'.format(self.turbine_id[i - 1]), linestyle='solid')
+        plt.plot(x, data_j, label='turbine {}'.format(self.turbine_id[j - 1]), linestyle='dashed')
+
+        plt.title('{} and {}'.format(i, j))
+        plt.legend()
+        plt.savefig('analysis_img/{}_{}_turbine.png'.format(self.turbine_id[i - 1], self.turbine_id[j - 1]))
+        plt.show()
+
+    def process_ori_data(self):
+        """
+        对原始数据进行处理,聚类和绘图
+        :return:
+        """
+        self.turbine_clusters(self.ori_turbine_fft)
+        self.paint_turbine()
+
+    def turbine_clusters(self, data=None):
+        """
+        风机数据聚类,聚类信息保存在self.cluster中
+        :param data: 默认为空,也可以使用其他数据聚类,并体现在绘图中,
+        数据格式:二维数据n*m,n为数据条数,m为每条数据维数
+        :return: None
+        """
+        if data is None:
+            cluster = hierarchical_clustering(self.turbine_diff, threshold=1.4,
+                                              similarity_func=compute_pearsonr)  # 层次聚类
+        else:
+            cluster = hierarchical_clustering(data, threshold=0.6,
+                                              similarity_func=compute_pearsonr)
+        self.cluster = cluster
+        # 在这里保存cluster变量
+        # from cluster_analysis import cluster_power_list_file, cluster_power_list_folder
+        #
+        # output_path = '../data-process/data/cluster_power/'
+
+        # cluster_power_list_file(self.cluster, self.turbine_id,
+        #                         input_path='../data-process/data/output_filtered_csv_files/', output_path=output_path)
+        # cluster_power_list_folder(self.cluster, self.turbine_id, input_path='../data-process/data/continuous_data/',
+        #                           output_path=output_path)
+
+if __name__ == '__main__':
+    import pickle
+    with open('./turbine_dict.pickle', 'rb') as f:
+        turbine_dict = pickle.load(f)
+    # number, dt = 0, []
+    # for key, turbine in turbine_dict.items():
+    #     if number == 0:
+    #         dt = turbine['时间']
+    #         number += 1
+    #     else:
+    #         dt = pd.to_datetime(list(set(dt) & set(turbine['时间'])))
+    # for key, turbine in turbine_dict.items():
+    #     turbine_dict[key] = turbine[turbine['时间'].isin(dt)]
+    # with open('./turbine_dict_common.pickle', 'wb') as f:
+    #     pickle.dump(turbine_dict, f)
+
+    data_analysis = DataAnalysis(data_length=34550,
+                                 data_start=0,
+                                 data_end=34550)
+
+    data_analysis.process_ori_data()
+    data_analysis.mapping_turbines()
+    # data_analysis.paint_double(1, 56)

+ 84 - 0
DataBase/turbine-cluster/data_cleaning.py

@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/10/11 11:00
+# file: data_cleaning.py
+# author: David
+# company: shenyang JY
+import numpy as np
+np.random.seed(42)
+
+def cleaning(df, name, cols=None, dup=True):
+    print("开始清洗:{}……".format(name))
+    data = df.copy()
+    data = data_column_cleaning(data)
+    if dup:
+        data = rm_duplicated(data)
+    if cols is not None:
+        data = key_field_row_cleaning(data, cols)
+    return data
+
+def data_column_cleaning(data, clean_value=[-99.0, -99]):
+    """
+    列的清洗
+    :param data:
+    :param clean_value:
+    :return:
+    """
+    data1 = data.copy()
+    cols_pre = data.columns.to_list()
+    for val in clean_value:
+        data1 = data1.replace(val, np.nan)
+    # nan 列超过80% 删除
+    data1 = data1.dropna(axis=1, thresh=len(data) * 0.8)
+    # 删除取值全部相同的列
+    data1 = data1.loc[:, (data1 != data1.iloc[0]).any()]
+    data = data[data1.columns.tolist()]
+    cols_late = data.columns.tolist()
+    if len(cols_pre) > len(cols_late):
+        print("清洗的列有:{}".format(set(cols_pre) - set(cols_late)))
+    return data
+
+
+def interpolation(data):
+    # 剩下的nan进行线性插值
+    data = data.bfill()
+    return data
+
+
+def key_field_row_cleaning(data, cols):
+    """
+    行的重要字段清洗: 过滤含有- 99的数字,过滤空值
+    :param data:
+    :param cols: 指定的字段列表
+    :return:
+    """
+    rows_pre = len(data)
+    for col in cols:
+        if col in data.columns.tolist():
+            data = data[~((data.loc[:, col] < 0) & (data.loc[:, col].astype(str).str.contains('99')))]
+            data = data[~data.loc[:, col].isnull()]
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("清洗的行数有:", rows_pre-rows_late)
+    return data
+
+def rm_duplicated(data):
+    """
+    按照时间去重
+    :param data:
+    :return:
+    """
+    # 按照时间去重
+    rows_pre = len(data)
+    data = data.drop_duplicates(subset='C_TIME')
+    # data = data.groupby(by='C_TIME').mean()
+    # data.reset_index(inplace=True)
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("时间去重的行数有:", rows_pre - rows_late)
+    return data
+
+if __name__ == '__main__':
+    import pandas as pd
+    power = pd.read_csv('./data/power.csv')
+    x = data_column_cleaning(power)

+ 417 - 0
DataBase/turbine-cluster/inputData.py

@@ -0,0 +1,417 @@
+import pandas as pd
+import datetime, time
+import re
+import os
+import pymysql
+from sqlalchemy import create_engine
+import pytz
+from data_cleaning import cleaning, rm_duplicated, key_field_row_cleaning
+
+current_path = os.path.dirname(__file__)
+dataloc = current_path + '/data/'
+
+def readData(name):
+    """
+    读取数据
+    :param name: 名字
+    :return:
+    """
+    path =  r"./cache/data/" + name
+    return pd.read_csv(path)
+
+
+def saveData(name, data):
+    """
+    存放数据
+    :param name: 名字
+    :param data: 数据
+    :return:
+    """
+    path =  r"./cache/data/" + name
+    os.makedirs(os.path.dirname(path), exist_ok=True)
+    data.to_csv(path, index=False)
+
+
+def timestamp_to_datetime(ts):
+    local_timezone = pytz.timezone('Asia/Shanghai')
+    if type(ts) is not int:
+        raise ValueError("timestamp-时间格式必须是整型")
+    if len(str(ts)) == 13:
+        dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
+        return dt
+    elif len(str(ts)) == 10:
+        dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
+        return dt
+    else:
+        raise ValueError("timestamp-时间格式错误")
+
+def dt_tag(dt):
+    date = dt.replace(hour=0, minute=0, second=0)
+    delta = (dt - date) / pd.Timedelta(minutes=15)
+    return delta + 1
+
+def timestr_to_timestamp(time_str):
+    """
+    将时间戳或时间字符串转换为datetime.datetime类型
+    :param time_data: int or str
+    :return:datetime.datetime
+    """
+    if isinstance(time_str, str):
+        if len(time_str) == 10:
+            dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
+            return int(round(time.mktime(dt.timetuple())) * 1000)
+        elif len(time_str) in {17, 18, 19}:
+            dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')   # strptime字符串解析必须严格按照字符串中的格式
+            return int(round(time.mktime(dt.timetuple())) * 1000)   # 转换成毫秒级的时间戳
+        else:
+            raise ValueError("时间字符串长度不满足要求!")
+    else:
+        return time_str
+
+
+class DataBase(object):
+    def __init__(self, begin, end, opt, logger):
+        self.begin = begin
+        self.opt = opt
+        # self.his_begin = self.begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)
+        # self.end = end + pd.Timedelta(days=1) - pd.Timedelta(minutes=15)
+        # self.begin_stamp = timestr_to_timestamp(str(begin))
+        # self.his_begin_stamp = timestr_to_timestamp(str(self.his_begin))
+        # self.end_stamp = timestr_to_timestamp(str(self.end))
+        self.database = opt.database
+        self.logger = logger
+        # self.towerloc = self.opt.tower
+
+    def clear_data(self):
+        """
+        删除所有csv
+        :return:
+        """
+        # 设置文件夹路径
+        import glob
+        import os
+        folder_path = dataloc
+
+        # 使用 glob 获取所有的 .csv 文件路径
+        csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
+
+        # 遍历所有 .csv 文件并删除
+        for file_path in csv_files:
+            os.remove(file_path)
+        self.logger.info("清除所有csv文件")
+
+    def create_database(self):
+        """
+        创建数据库连接
+        :param database: 数据库地址
+        :return:
+        """
+        engine = create_engine(self.database)
+        return engine
+
+    def exec_sql(self, sql, engine):
+        """
+        从数据库获取数据
+        :param sql: sql语句
+        :param engine: 数据库对象
+        :return:
+        """
+        df = pd.read_sql_query(sql, engine)
+        return df
+
+    def get_process_NWP(self):
+        """
+        从数据库中获取NWP数据,并进行简单处理
+        :param database:
+        :return:
+        """
+        # NPW数据
+        engine = self.create_database()
+        sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
+                  "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION,  " \
+                  "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
+                  "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" \
+                  " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 风的NWP字段
+        NWP = self.exec_sql(sql_NWP, engine)
+
+        NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
+
+        NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
+        # NWP['DT_TAG'] = NWP.apply(lambda x: dt_tag(x['C_TIME']), axis=1)
+        NWP = cleaning(NWP, 'NWP')
+        # NWP = self.split_time(NWP)
+        NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        saveData("NWP.csv", NWP)
+        self.logger.info("导出nwp数据")
+        return NWP
+
+    def get_process_tower(self):
+        """
+        获取环境检测仪数据
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        self.logger.info("提取测风塔:{}".format(self.opt.towerloc))
+        for i in [self.opt.towerloc]:
+            # 删除没用的列
+            drop_colmns = ["C_ID","C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"]
+
+            get_colmns = []
+            # # 查询表的所有列名
+            result_set = self.exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine)
+
+            tower = pd.read_csv("./cache/data/t_wind_tower_status_data.csv")
+            tower.columns = list(result_set['Field'].values)
+
+            for name in result_set.iloc[:,0]:
+                if name not in drop_colmns:
+                    get_colmns.append(name)
+
+            all_columns_str = ", ".join([f'{col}' for col in get_colmns])
+            # tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i) + " and C_TIME between '{}' and '{}'".format(self.his_begin, self.end)
+            # tower = self.exec_sql(tower_sql, engine)
+            tower = tower[all_columns_str.split(', ')]
+            # tower.drop(columns=drop_colmns, inplace=True)
+            tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
+            saveData("/tower-{}.csv".format(i), tower)
+            self.logger.info("测风塔{}导出数据".format(i))
+
+    def get_process_power(self):
+        """
+        获取整体功率数据
+        :param database:
+        :return:
+        """
+        powers = pd.read_csv('./cache/data/t_power_station_status_data.csv')
+        shouzu = pd.read_csv('./cache/data/全场_原始数据_2024-11-23_16-22-42.csv')
+        shouzu.rename(columns={'时间':'C_TIME', '场外受阻发电量': 'SHOUZU'}, inplace=True)
+        shouzu['C_TIME'] = pd.to_datetime(shouzu['C_TIME'])
+        shouzu = shouzu[~(shouzu['SHOUZU'] > 0)]
+
+
+        engine = self.create_database()
+        sql_cap = "select C_CAPACITY from t_electric_field"
+        cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
+        self.opt.cap = float(cap)
+
+        result_set = self.exec_sql("SHOW COLUMNS FROM t_power_station_status_data", engine)
+        powers.columns = list(result_set.iloc[:, 0].values)
+        powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
+        powers = pd.merge(powers, shouzu, on='C_TIME')
+        powers = powers[['C_TIME', 'C_REAL_VALUE', 'C_ABLE_VALUE', 'C_IS_RATIONING_BY_MANUAL_CONTROL', 'C_IS_RATIONING_BY_AUTO_CONTROL']]
+
+        # if self.opt.usable_power["clean_power_by_signal"]:
+        #     sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and  C_IS_RATIONING_BY_AUTO_CONTROL=0"
+        powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
+        mask2 = powers['C_REAL_VALUE'] < 0
+        mask1 = powers['C_REAL_VALUE'].astype(float) > float(cap)
+        mask = powers['C_REAL_VALUE'] == -99
+
+        mask = mask | mask1 | mask2
+        self.logger.info("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
+        powers = powers[~mask]
+        self.logger.info("剔除完后还剩{}条".format(len(powers)))
+        # binary_map = {b'\x00': 0, b'\x01': 1}
+        # powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
+        powers = rm_duplicated(powers)
+        saveData("power_filter4.csv", powers)
+
+    def get_process_dq(self):
+        """
+        获取短期预测结果
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
+                 "where C_FORECAST_TIME between {} and {}".format(self.his_begin_stamp, self.end_stamp)
+        dq = self.exec_sql(sql_dq, engine)
+        # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
+        dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
+        # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
+        # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
+        dq = cleaning(dq, 'dq', cols=['C_FP_VALUE'])
+        dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        saveData("dq.csv", dq)
+
+    def indep_process(self):
+        """
+        进一步数据处理:时间统一处理等
+        :return:
+        """
+        # 测风塔数据处理
+        for i in [self.opt.towerloc]:
+            tower = readData("/tower-{}.csv".format(i))
+            tower = cleaning(tower, 'tower', [self.opt.usable_power["env"]])
+
+            tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
+            tower_ave = tower.resample('15T', on='C_TIME').mean().reset_index()
+            tower_ave = tower_ave.dropna(subset=[self.opt.usable_power['env']])
+            tower_ave.iloc[:, 1:] = tower_ave.iloc[:, 1:].round(2)
+            saveData("/tower-{}-process.csv".format(i), tower_ave)
+
+    def get_process_cdq(self):
+        """
+        获取超短期预测结果
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from t_forecast_power_ultra_short_term_his" \
+                  " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
+        cdq = self.exec_sql(sql_cdq, engine)
+        cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
+        cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False)
+        # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
+        cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        saveData("cdq.csv", cdq)
+
+    def get_process_turbine(self):
+        """
+        从数据库中获取风头数据,并进行简单处理
+        :param database:
+        :return:
+        """
+        cids = [x for x in range(33, 65, 1)]
+        c_names = ['F01', 'F02', 'F03', 'F04', 'F05', 'F06', 'F07', 'F08', 'F09', 'F10']+['F'+str(x) for x in range(10, 32)]
+        id_names = {id: c_names[x] for x, id in enumerate(cids)}
+        turbines = pd.read_csv('./cache/data/turbines.csv')
+        turbines_ori = pd.read_csv('./cache/data/全场_原始数据_2024-08-01_19-40-25.csv')
+        turbines_ori['风机'] = turbines_ori['风机'].apply(lambda x: re.sub(r'\(.*?\)|\[.*?\]|\{.*?\}', '', x))
+        turbines_ori.rename(columns={"时间": "C_TIME"}, inplace=True)
+        turbines_ori = turbines_ori[['C_TIME', '风机', '低位首触机组运行状态', '高位首触机组运行状态']]
+        turbines_ori['C_TIME'] = pd.to_datetime(turbines_ori['C_TIME'])
+        turbines['C_TIME'] = pd.to_datetime(turbines['C_TIME'])
+        for number in self.opt.turbineloc:
+            # number = self.opt.usable_power['turbine_id']
+            # 机头数据
+            turbine = turbines[turbines['C_EQUIPMENT_NO'] == number]
+            turbine_ori = turbines_ori[(turbines_ori['风机'] == id_names[number]) & (turbines_ori['低位首触机组运行状态'] <=3) & (turbines_ori['高位首触机组运行状态'] <=3)]
+            turbine = pd.merge(turbine, turbine_ori.loc[:, ["C_TIME", "风机"]], on="C_TIME")
+            turbine.drop(columns=['风机'], inplace=True)
+            turbine = key_field_row_cleaning(turbine, cols=['C_WS', 'C_ACTIVE_POWER'])
+            turbine = turbine[turbine['C_TIME'].dt.strftime('%M').isin(['00', '15', '30', '45'])]
+            # 直接导出所有数据
+            saveData("turbine-{}.csv".format(number), turbine)
+
+    def process_csv_files(self, input_dir, output_dir, M, N):  # MBD:没有考虑时间重复
+        if not os.path.exists(output_dir):
+            os.makedirs(output_dir)
+
+        for i in self.opt.turbineloc:
+            input_file = os.path.join(input_dir, f"turbine-{i}.csv")
+            output_file = os.path.join(output_dir, f"turbine-{i}.csv")
+
+            # 读取csv文件
+            df = pd.read_csv(input_file)
+
+            # 剔除异常值,并获取异常值统计信息
+            df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = self.remove_abnormal_values(df, N)
+
+            # 输出异常值统计信息
+            self.logger.info(f"处理文件:{input_file}")
+            self.logger.info(f"剔除 -99 点异常值数量:{count_abnormal1}")
+            self.logger.info(f"剔除连续异常值数量:{count_abnormal2}")
+            self.logger.info(f"总共剔除数据量:{total_removed}")
+            self.logger.info(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
+
+            # 保存处理过的CSV文件
+            df_clean.to_csv(output_file, index=False)
+
+    def remove_abnormal_values(self,df, N):
+        # 标记C_ACTIVE_POWER为-99的行为异常值
+        abnormal_mask1 = df['C_ACTIVE_POWER'] == -99
+        count_abnormal1 = abnormal_mask1.sum()
+
+        # 标记C_WS, A, B连续5行不变的行为异常值
+        columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER']
+        abnormal_mask2 = self.mark_abnormal_streaks(df, columns, N)
+        count_abnormal2 = abnormal_mask2.sum()
+        # 获得所有异常值的布尔掩码
+        abnormal_mask = abnormal_mask1 | abnormal_mask2
+        # 获取连续异常值具体数值
+        removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns}
+        # 剔除异常值
+        df_clean = df[~abnormal_mask]
+        total_removed = abnormal_mask.sum()
+        return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
+
+    # ——————————————————————————对分区的风机进行发电功率累加——————————————————————————————
+    def zone_powers(self, input_dir):
+        z_power = {}
+        for zone, turbines in self.opt.zone.items():
+            dfs = [pd.read_csv(os.path.join(input_dir, f"turbine-{z}.csv")) for z in self.opt.turbineloc if z in turbines]
+            z_power['C_TIME'] = dfs[0]['C_TIME']
+            sum_power = pd.concat([df['C_ACTIVE_POWER'] for df in dfs], ignore_index=True, axis=1).sum(axis=1)
+            z_power[zone] = sum_power
+        z_power = pd.DataFrame(z_power)
+        z_power.iloc[:, 1:] = z_power.iloc[:, 1:].round(2)
+        saveData("z-power-t.csv", z_power)
+
+
+
+    # ——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
+    def mark_abnormal_streaks(self, df, columns, min_streak):
+        abnormal_mask = pd.Series(False, index=df.index)
+        streak_start = None
+        for i in range(len(df)):
+            if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns):
+                streak_start = i
+
+            if i - streak_start >= min_streak - 1:
+                abnormal_mask[i - min_streak + 1:i + 1] = True
+
+        return abnormal_mask
+
+    # ——————————————————————————风机单机时间对齐——————————————————————————————
+    def TimeMerge(self, input_dir, output_dir, M):
+        # 读取所有CSV文件
+        files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in self.opt.turbineloc]
+        dataframes = [pd.read_csv(f) for f in files]
+
+        # 获取C_TIME列的交集
+        c_time_intersection = set(dataframes[0]["C_TIME"])
+        for df in dataframes[1:]:
+            c_time_intersection.intersection_update(df["C_TIME"])
+
+        # 只保留C_TIME交集中的数据
+        filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes]
+
+        # 将每个过滤后的DataFrame写入新的CSV文件
+        os.makedirs(output_dir, exist_ok=True)
+        for (filtered_df, i) in zip(filtered_dataframes, self.opt.turbineloc):
+            if i == 144:
+                filtered_df['C_ACTIVE_POWER'] /= 1000
+            filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
+
+    def data_process(self):
+        """
+        数据导出+初步处理的总操控代码
+        :param database:
+        :return:
+        """
+        # self.clear_data()
+        self.get_process_power()
+        # self.get_process_dq()
+        # self.get_process_cdq()
+        # self.get_process_NWP()
+        # self.get_process_tower()
+        # self.indep_process()
+        self.get_process_turbine()
+        self.process_csv_files('./cache/data', './cache/data', 50, 5)
+        self.TimeMerge('./cache/data', './cache/data', 50)
+        self.zone_powers('./cache/data')
+
+if __name__ == '__main__':
+    from logs import Log
+    from config import myargparse
+    import matplotlib.pyplot as plt
+    import matplotlib.colors as mcolors
+    import pandas as pd
+
+    args = myargparse(discription="场站端配置", add_help=False)
+    opt = args.parse_args_and_yaml()
+    log = Log().logger
+    db = DataBase(begin='', end='', opt=opt, logger=log)
+    db.data_process()