123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/3/17 10:10
- # file: main.py
- # author: David
- # company: shenyang JY
- import pandas as pd
- import os
- import numpy as np
- from data_utils import *
- import yaml
- class data_process(object):
- def __init__(self, opt):
- self.std = {}
- self.mean = {}
- self.opt = opt
- # 主要是联立后的补值操作
- def get_train_data(self):
- """
- :param args:
- :return: 返回分割补值处理好的npy向量文件
- """
- csv_data_path = self.opt.excel_data_path
- data_train = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["nwp"]))
- data_train['C_TIME'] = pd.to_datetime(data_train['C_TIME'])
- data_train = self.data_cleaning(data_train)
- # power['C_TIME'] = pd.to_datetime(power['C_TIME'])
- envir_cols = ['C_TIME', 'C_GLOBALR', 'C_DIRECTR', 'C_DIFFUSER', 'C_AIRT', 'C_CELLT']
- # envir_cols = ['C_TIME', 'C_GLOBALR', 'C_DIRECTR', 'C_DIFFUSER', 'C_OBLIQUER']
- # envir = envir.drop(labels='C_SYNC_TIME', axis=1)
- # 第一步:联立 MBD 环境数据不应该联立
- # unite = pd.merge(unite, envir, on='C_TIME')
- # 第二步:计算间隔
- # data_train['C_TIME'] = pd.to_datetime(data_train['C_TIME'])
- # data_train['time_diff'] = data_train['C_TIME'].diff()
- # dt_short = pd.Timedelta(minutes=15)
- # dt_long = pd.Timedelta(minutes=15 * 10)
- # dfs = self.missing_time_splite(data_train, dt_short, dt_long)
- # miss_points = data_train[(data_train['time_diff'] > dt_short) & (data_train['time_diff'] < dt_long)]
- # miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
- # print("再次测算,需要插值的总点数为:", miss_number)
- # dfs_train, dfs_test = self.data_fill(dfs)
- # from sklearn.model_selection import train_test_split
- # data_train, data_test = train_test_split(data_train, test_size=(1-self.opt.train_data_rate),
- # random_state=self.opt.Model["random_seed"],
- # shuffle=self.opt.shuffle_train_data)
- # envir = envir.iloc[:, :-1]
- data_train = self.norm_features(data_train, ['C_TIME'])
- self.feature_columns(data_train)
- return data_train
- def get_test_data(self):
- data_test = pd.read_csv(os.path.join(self.opt.excel_data_path, self.opt.data_format["test"]))
- data_test['C_TIME'] = pd.to_datetime(data_test['C_TIME'])
- data_test = self.data_cleaning(data_test) # envir_cols = ['C_TIME', 'C_GLOBALR', 'C_DIRECTR', 'C_DIFFUSER', 'C_OBLIQUER']
- # envir = envir.iloc[:, :-1]
- # drop_cols = ['C_TEMPERATURE120', 'C_TEMPERATURE130', 'C_TEMPERATURE140',
- # 'C_TEMPERATURE150', 'C_TEMPERATURE160', 'C_TEMPERATURE170',
- # 'C_TEMPERATURE180', 'C_TEMPERATURE190', 'C_TEMPERATURE200',
- # 'C_DIRECTION120', 'C_DIRECTION130', 'C_DIRECTION140',
- # 'C_DIRECTION150', 'C_DIRECTION160', 'C_DIRECTION170',
- # 'C_DIRECTION180', 'C_DIRECTION190', 'C_DIRECTION200',
- # 'C_SPEED120', 'C_SPEED130', 'C_SPEED140', 'C_SPEED150',
- # 'C_SPEED160', 'C_SPEED170', 'C_SPEED180',
- # 'C_SPEED190', 'C_SPEED200'
- #
- # ]
- preserve = ['C_TIME', 'C_RADIATION', 'C_SURFACE_PRESSURE', 'C_HUMIDITY2', 'C_TEMPERATURE2', 'C_TEMPERATURE10', 'C_TEMPERATURE30', 'C_TEMPERATURE50', 'C_TEMPERATURE70', 'C_TEMPERATURE80', 'C_TEMPERATURE90', 'C_TEMPERATURE110', 'C_DIRECTION10', 'C_DIRECTION30', 'C_DIRECTION50', 'C_DIRECTION70', 'C_DIRECTION80', 'C_DIRECTION90', 'C_DIRECTION110', 'C_SPEED10', 'C_SPEED30', 'C_SPEED50', 'C_SPEED70', 'C_SPEED80', 'C_SPEED90', 'C_SPEED110', 'C_DNI_CALCD', 'C_SOLAR_ZENITH', 'C_CLEARSKY_GHI', 'C_LCC', 'C_MCC', 'C_HCC', 'C_TCC', 'C_TPR', 'col1_power', 'col2_power', 'sum_power', 'C_VALUE']
- data_test = data_test.loc[:, preserve]
- data_test = self.norm_features(data_test, ['C_TIME', 'sum_power', 'C_VALUE'])
- self.feature_columns(data_test)
- return data_test
- def feature_columns(self, data):
- self.opt.columns_lstm = list(data.loc[:, 'C_RADIATION': 'C_TPR'])
- self.opt.input_size_lstm = len(self.opt.columns_lstm)
- self.opt.input_size_cnn = 1
- print("cnn:", self.opt.columns_cnn)
- print("lstm", self.opt.columns_lstm)
- print("opt列名设置完毕!", self.opt.columns_cnn, self.opt.columns_lstm)
- def norm_features(self, data, drop_list):
- data1_ = data.drop(drop_list, axis=1, errors='ignore')
- columns = list(data1_.columns)
- mean = np.array([self.opt.mean[col] for col in columns])
- std = np.array([self.opt.std[col] for col in columns])
- new = []
- for i, d in data1_.iterrows():
- d = (d - mean) / std # 归一化
- new.append(d)
- new = pd.concat(new, axis=1).T
- for col in new.columns: # 繁琐不一定是 简洁
- data[col] = new[col]
- return data
- def data_cleaning(self, data, clean_value=[-9999.0, -99]):
- for val in clean_value:
- data = data.replace(val, np.nan)
- # nan 超过30% 删除
- data = data.dropna(axis=1, thresh=len(data)*0.8)
- # 删除取值全部相同的列
- data = data.loc[:, (data != data.iloc[0]).any()]
- # 剩下的nan进行线性插值
- data = data.interpolate(method='bfill')
- return data
- def missing_time_splite(self, df, dt_short, dt_long):
- n_long, n_short, n_points = 0, 0, 0
- start_index = 0
- dfs = []
- for i in range(1, len(df)):
- if df['time_diff'][i] >= dt_long:
- df_long = df.iloc[start_index:i, :-1]
- dfs.append(df_long)
- start_index = i
- n_long += 1
- if df['time_diff'][i] > dt_short:
- print(df['C_TIME'][i-1], end=" ~ ")
- print(df['C_TIME'][i], end=" ")
- points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
- print("缺失点数:", points)
- if df['time_diff'][i] < dt_long:
- n_short += 1
- n_points += points
- print("需要补值的点数:", points)
- dfs.append(df.iloc[start_index:, :-1])
- print("数据总数:", len(df), ",时序缺失的间隔:", n_short, "其中,较长的时间间隔:", n_long)
- print("需要补值的总点数:", n_points)
- return dfs
- def data_fill(self, dfs, test):
- dfs_train, dfs_test, inserts = [], [], 0
- for i, df in enumerate(dfs):
- df1 = df.set_index('C_TIME', inplace=False)
- dff = df1.resample('15T').bfill()
- dff.reset_index(inplace=True)
- points = len(dff) - len(df1)
- if i not in test:
- if i == 0:
- dff = dff.iloc[8:, :].reset_index(drop=True)
- dfs_train.append(dff)
- print("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
- inserts += points
- else:
- print("{} ~ {} 有 {} 个点, 缺失 {} 个点.(测试集)".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
- dfs_test.append(dfs[i].reset_index(drop=True))
- print("训练集分成了{}段".format(len(dfs_train)))
- return dfs_train, dfs_test
- def drop_duplicated(self, df):
- df = df.groupby(level=0).mean() # DatetimeIndex时间索引去重
- return df
- def read_data(self, path, cols=None, index_col=None):
- init_data = pd.read_excel(path, usecols=cols, index_col=index_col, engine='openpyxl')
- return init_data
- if __name__ == "__main__":
- from config import myargparse
- parse = myargparse(discription="training config", add_help=False)
- opt = parse.parse_args_and_yaml()
- ds = data_process(opt=opt)
- path = ds.opt.excel_data_path
- os.makedirs(path, exist_ok=True)
- excel_data_path = ds.opt.excel_data_path
- data_format = ds.opt.data_format
- # dq_path = excel_data_path + data_format["dq"].replace('.csv', '.xlsx')
- rp_path = excel_data_path + data_format["rp"].replace('.csv', '.xlsx')
- nwp_path = excel_data_path + data_format["nwp"].replace('.csv', '.xlsx')
- envir_path = excel_data_path + data_format["envir"].replace('.csv', '.xlsx')
- rp_columns = ['C_TIME', 'C_REAL_VALUE'] # 待优化 ["'C_TIME'", "'C_REAL_VALUE'"] 原因:csv 字符串是单引号'',read_csv带单引号
- nwp = ds.read_data(nwp_path) # 待优化 导出csv按照表的列顺序 read_csv按照csv列顺序读取
- nwp = ds.data_cleaning(nwp)
- nwp.drop(['C_SYNC_TIME'], axis=1, inplace=True)
- # nwp.set_index('C_TIME', inplace=True)
- # nwp = ds.drop_duplicated(nwp)
- envir = ds.read_data(envir_path) # 待优化 导出csv按照表的列顺序 read_csv按照csv列顺序读取
- envir = ds.data_cleaning(envir)
- # envir.set_index('C_TIME', inplace=True)
- # envir.drop(['C_WS_NO', 'C_SYNC_TIME'])
- # envir = ds.drop_duplicated(envir)
- rp = ds.read_data(rp_path, rp_columns)
- # rp.set_index('C_TIME', inplace=True) # nan也可以设置索引列
- rp = ds.data_cleaning(rp)
- # rp = ds.drop_duplicated(rp)
- dataframes = [nwp, rp, envir]
- # 查找最大起始时间和最小结束时间
- max_start_time = max(df['C_TIME'].min() for df in dataframes)
- min_end_time = min(df['C_TIME'].max() for df in dataframes)
- print(max_start_time)
- print(min_end_time)
- # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据
- for i, df in enumerate(dataframes):
- df['C_TIME'] = pd.to_datetime(df['C_TIME']) # 确保时间列是 datetime 类型
- df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)]
- # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀
- dataframes[i] = df_filtered.reset_index(drop=True)
- dataframes[0].to_csv(os.path.join(path, 'process', 'nwp.csv'), index=False)
- dataframes[2].to_csv(os.path.join(path, 'process', 'envir.csv'), index=False)
- dataframes[1].to_csv(os.path.join(path, 'process', 'rp.csv'), index=False)
|