David 5 hónapja
szülő
commit
3fcb81106e

+ 3 - 0
.idea/.gitignore

@@ -0,0 +1,3 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml

+ 81 - 0
pre_processing/data_cleaning.py

@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/10/11 11:00
+# file: data_cleaning.py
+# author: David
+# company: shenyang JY
+import numpy as np
+np.random.seed(42)
+
+def cleaning(df, name, logger, cols=None, dup=True):
+    logger.info("开始清洗:{}……".format(name))
+    data = df.copy()
+    data = data_column_cleaning(data, logger)
+    if dup:
+        data = rm_duplicated(data, logger)
+    if cols is not None:
+        data = key_field_row_cleaning(data, cols, logger)
+    return data
+
+
+def data_column_cleaning(data, logger, clean_value=[-99.0, -99]):
+    """
+    列的清洗
+    :param data:
+    :param logger:
+    :param clean_value:
+    :return:
+    """
+    data1 = data.copy()
+    cols_pre = data.columns.to_list()
+    for val in clean_value:
+        data1 = data1.replace(val, np.nan)
+    # nan 列超过80% 删除
+    data1 = data1.dropna(axis=1, thresh=len(data) * 0.8)
+    # 删除取值全部相同的列
+    data1 = data1.loc[:, (data1 != data1.iloc[0]).any()]
+    data = data[data1.columns.tolist()]
+    cols_late = data.columns.tolist()
+    if len(cols_pre) > len(cols_late):
+        logger.info("列清洗:清洗的列有:{}".format(set(cols_pre) - set(cols_late)))
+    return data
+
+
+def key_field_row_cleaning(data, cols, logger):
+    """
+    行的重要字段清洗: 过滤含有- 99的数字,过滤空值
+    :param data:
+    :param cols: 指定的字段列表
+    :param logger:
+    :return:
+    """
+    rows_pre = len(data)
+    nan_cols = []
+    for col in cols:
+        begin = len(data)
+        if col in data.columns.tolist():
+            # data = data[~((data.loc[:, col] < 0) & (data.loc[:, col].astype(str).str.contains('99')))]
+            data = data[~(data[col] == -99)]
+            data = data[~data.loc[:, col].isnull()]
+        end = len(data)
+        if end - begin > 0:
+            nan_cols.append(col)
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        logger.info("行清洗:清洗的行数有:{},缺失的列有:{}".format(rows_pre-rows_late, ', '.join(nan_cols)))
+    return data
+
+def rm_duplicated(data, logger):
+    """
+    按照时间去重
+    :param data:
+    :param logger:
+    :return:
+    """
+    # 按照时间去重
+    rows_pre = len(data)
+    data = data.drop_duplicates(subset='C_TIME')
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        logger.info("时间去重的行数有:{}".format(rows_pre - rows_late))
+    return data

+ 97 - 0
pre_processing/data_filling.py

@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2024/5/6 13:52
+# file: data_process.py
+# author: David
+# company: shenyang JY
+import os
+import numpy as np
+import pandas as pd
+from cache.data_cleaning import rm_duplicated
+np.random.seed(42)
+
+
+class DataProcess(object):
+    def __init__(self, log, args):
+        self.logger = log
+        self.args = args
+        self.opt = self.args.parse_args_and_yaml()
+
+    # 主要是联立后的补值操作
+    def get_train_data(self, unite, envir):
+        # unite = pd.merge(unite, envir, on='C_TIME')
+        unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
+        unite['time_diff'] = unite['C_TIME'].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
+        data_train = self.missing_time_splite(unite, dt_short, dt_long)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
+        self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
+        if miss_number > 0 and self.opt.Model["train_data_fill"]:
+            data_train = self.data_fill(data_train)
+        return data_train, envir
+
+    def get_predict_data(self, nwp, dq):
+        if self.opt.Model["predict_data_fill"] and len(dq) > len(nwp):
+            self.logger.info("接口nwp和dq合并清洗后,需要插值的总点数为:{}".format(len(dq)-len(nwp)))
+            nwp.set_index('C_TIME', inplace=True)
+            dq.set_index('C_TIME', inplace=True)
+            nwp = nwp.resample('15T').interpolate(method='linear') # nwp先进行线性填充
+            nwp = nwp.reindex(dq.index, method='bfill') # 再对超过采样边缘无法填充的点进行二次填充
+            nwp = nwp.reindex(dq.index, method='ffill')
+            nwp.reset_index(drop=False, inplace=True)
+            dq.reset_index(drop=False, inplace=True)
+        return nwp
+
+    def get_test_data(self, unite, envir):
+        # 第二步:计算间隔
+        unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
+        unite['time_diff'] = unite['C_TIME'].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
+        data_test = self.missing_time_splite(unite, dt_short, dt_long)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
+        self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
+        if self.opt.Model["test_data_fill"] and miss_number > 0:
+            data_test = self.data_fill(data_test, test=True)
+        return data_test, envir
+
+    def missing_time_splite(self, df, dt_short, dt_long):
+        n_long, n_short, n_points = 0, 0, 0
+        start_index = 0
+        dfs = []
+        for i in range(1, len(df)):
+            if df['time_diff'][i] >= dt_long:
+                df_long = df.iloc[start_index:i, :-1]
+                dfs.append(df_long)
+                start_index = i
+                n_long += 1
+            if df['time_diff'][i] > dt_short:
+                self.logger.info(f"{df['C_TIME'][i-1]} ~ {df['C_TIME'][i]}")
+                points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
+                self.logger.info("缺失点数:{}".format(points))
+                if df['time_diff'][i] < dt_long:
+                    n_short += 1
+                    n_points += points
+                    self.logger.info("需要补值的点数:{}".format(points))
+        dfs.append(df.iloc[start_index:, :-1])
+        self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
+        self.logger.info("需要补值的总点数:{}".format(n_points))
+        return dfs
+
+    def data_fill(self, dfs, test=False):
+        dfs_fill, inserts = [], 0
+        for i, df in enumerate(dfs):
+            df = rm_duplicated(df, self.logger)
+            df1 = df.set_index('C_TIME', inplace=False)
+            dff = df1.resample('15T').interpolate(method='linear')  # 采用线性补值,其他补值方法需要进一步对比
+            dff.reset_index(inplace=True)
+            points = len(dff) - len(df1)
+            dfs_fill.append(dff)
+            self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
+            inserts += points
+        name = "预测数据" if test is True else "训练集"
+        self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
+        return dfs_fill

+ 0 - 0
data_processing/processing_limit_power/logs.py → processing_limit_power/logs.py


+ 0 - 0
data_processing/processing_limit_power/processing_limit_power_by_agcavc.py → processing_limit_power/processing_limit_power_by_agcavc.py


+ 0 - 0
data_processing/processing_limit_power/processing_limit_power_by_machines.py → processing_limit_power/processing_limit_power_by_machines.py


+ 0 - 0
data_processing/processing_limit_power/processing_limit_power_by_records.py → processing_limit_power/processing_limit_power_by_records.py


+ 0 - 0
data_processing/processing_limit_power/processing_limit_power_by_solar.py → processing_limit_power/processing_limit_power_by_solar.py


+ 0 - 0
data_processing/processing_limit_power/processing_limit_power_by_statistics_light.py → processing_limit_power/processing_limit_power_by_statistics_light.py


+ 0 - 0
data_processing/processing_limit_power/processing_limit_power_by_wind.py → processing_limit_power/processing_limit_power_by_wind.py