Переглянути джерело

提交插值二次预处理

liudawei 1 рік тому
батько
коміт
2f56b72727
3 змінених файлів з 134 додано та 0 видалено
  1. 13 0
      .gitignore
  2. 2 0
      db-wind/main.py
  3. 119 0
      interpolation/data_process.py

+ 13 - 0
.gitignore

@@ -0,0 +1,13 @@
+*/__pycache__
+/__pycache__
+/.idea
+/checkpoint
+/log
+/data
+/figure
+*.log
+*.swp
+/log
+/data
+
+

+ 2 - 0
db-wind/main.py

@@ -76,6 +76,8 @@ def TimeMerge(input_dir, output_dir,M):
     # 将每个过滤后的DataFrame写入新的CSV文件
     os.makedirs(output_dir, exist_ok=True)
     for (filtered_df, i) in zip(filtered_dataframes, arg.turbineloc):
+        if i == 144:
+            filtered_df['C_ACTIVE_POWER'] /= 1000
         filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
 
 #——————————————————————————风机缺失点处理——————————————————————————————

+ 119 - 0
interpolation/data_process.py

@@ -0,0 +1,119 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/3/17 10:10
+# file: main.py
+# author: David
+# company: shenyang JY
+import pandas as pd
+import os
+import numpy as np
+from data_utils import *
+import yaml
+
+
+class data_process(object):
+    def __init__(self, opt):
+        self.std = None
+        self.mean = None
+        self.opt = opt
+
+    # 主要是联立后的补值操作
+    def get_processed_data(self, *args):
+        """
+
+        :param args:
+        :return: 返回分割补值处理好的npy向量文件
+        """
+        csv_data_path = self.opt.csv_data_path
+        nwp = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["nwp"]))
+        cluster_power = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["cluter_power"]))
+        # 第一步:联立
+        unite = pd.merge(nwp, cluster_power, on='C_TIME')
+        # 第二步:计算间隔
+        unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
+        unite['time_diff'] = unite['C_TIME'].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * 10)
+        dfs = self.missing_time_splite(unite, dt_short, dt_long)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
+        print("再次测算,需要插值的总点数为:", miss_number)
+        dfs_train, dfs_test = self.data_fill(dfs, [8])
+        self.normalize(dfs_train)  # 归一化
+        return dfs_train, dfs_test
+
+    def normalize(self, dfs):
+        """
+        暂时不将C_TIME归一化
+        :param dfs:
+        :return:
+        """
+        df = pd.concat(dfs, axis=0)
+        # df = df.reset_index()
+        # df["C_TIME"] = df["C_TIME"].apply(datetime_to_timestr)
+        mean = np.mean(df.iloc[1:, :], axis=0)  # 数据的均值
+        std = np.std(df.iloc[1:, :], axis=0)  # 标准差
+        # if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False:
+        #     self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()})
+        print("归一化参数,均值为:{},方差为:{}".format(mean.to_dict(), std.to_dict()))
+        self.mean, self.std = mean.to_dict(), std.to_dict()
+
+    def missing_time_splite(self, df, dt_short, dt_long):
+        n_long, n_short, n_points = 0, 0, 0
+        start_index = 0
+        dfs = []
+        for i in range(1, len(df)):
+            if df['time_diff'][i] >= dt_long:
+                df_long = df.iloc[start_index:i, :-1]
+                dfs.append(df_long)
+                start_index = i
+                n_long += 1
+            if df['time_diff'][i] > dt_short:
+                print(df['C_TIME'][i-1], end=" ~ ")
+                print(df['C_TIME'][i], end=" ")
+                points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
+                print("缺失点数:", points)
+                if df['time_diff'][i] < dt_long:
+                    n_short += 1
+                    n_points += points
+                    print("需要补值的点数:", points)
+        dfs.append(df.iloc[start_index:, :-1])
+        print("数据总数:", len(df), ",时序缺失的间隔:", n_short, "其中,较长的时间间隔:", n_long)
+        print("需要补值的总点数:", n_points)
+        return dfs
+
+    def data_fill(self, dfs, test):
+        dfs_train = []
+        for i, df in enumerate(dfs):
+            df.set_index('C_TIME', inplace=True)
+            dff = df.resample('15T').bfill()
+            dff.reset_index(inplace=True)
+            points = len(dff) - len(df)
+            if i not in test:
+                dfs_train.append(dff)
+                print("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
+            else:
+                print("{} ~ {} 有 {} 个点, 缺失 {} 个点.".format(df.index[0], df.index[-1], len(dff), points))
+        dfs_test = [dfs[t] for t in test]
+        return dfs_train, dfs_test
+
+    def set_yml(self, yml_dict):
+        with open(self.opt.config_yaml, 'r', encoding='utf-8') as f:
+            cfg = yaml.safe_load(f)
+        for k, v in yml_dict.items():
+            cfg[k] = v
+        with open(self.opt.config_yaml, 'w') as f:
+            yaml.safe_dump(cfg, f, default_flow_style=False)
+
+    def drop_duplicated(self, df):
+        df = df.groupby(level=0).mean()  # DatetimeIndex时间索引去重
+        return df
+
+if __name__ == "__main__":
+    # dq = ds.read_data(dq_path, dq_columns)[0]
+    # rp = ds.read_data(rp_path, rp_columns)[0]
+    # # rp_average(rp)    # 计算平均功率
+    # envir = ds.read_data(envir_path, envir_columns)[0]
+    # tables = ds.tables_integra(dq, rp, envir)
+    # ds.tables_norm_result(tables)
+    pass