123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/3/17 10:10
- # file: main.py
- # author: David
- # company: shenyang JY
- import pandas as pd
- import numpy as np
- from data_utils import *
- import yaml
- class data_process(object):
- def __init__(self, opt):
- self.std = None
- self.mean = None
- self.opt = opt
- # 都是在ndarray量纲下进行计算
- # self.norm_data = (self.tables[:, 1:] - self.mean) / self.std # 归一化,去量纲
- # self.norm_data.insert(0, 'C_TIME', self.tables['C_TIME'])
- # self.set_yml({'mean': self.mean.to_dict(), 'std': self.std.to_dict()})
- # self.start_num_in_test = 0
- def get_processed_data(self):
- excel_data_path = self.opt.excel_data_path
- data_format = self.opt.data_format
- dq_path = excel_data_path + data_format["dq"]
- rp_path = excel_data_path + data_format["rp"]
- envir_path = excel_data_path + data_format["envir"]
- dq_columns = ['C_FORECAST_TIME', 'C_FP_VALUE']
- rp_columns = ['C_TIME', 'C_REAL_VALUE'] # 待优化 ["'C_TIME'", "'C_REAL_VALUE'"] 原因:csv 字符串是单引号'',read_csv带单引号
- # envir = self.read_data(envir_path).loc[:, "C_TIME":] # 待优化 导出csv按照表的列顺序 read_csv按照csv列顺序读取
- # envir = self.data_cleaning(envir)
- # envir["C_TIME"] = envir["C_TIME"].apply(timestr_to_datetime)
- # envir.set_index('C_TIME', inplace=True)
- # envir = self.drop_duplicated(envir)
- # 读取的df,经过时序转换、清洗、去重三部曲
- dq = self.read_data(dq_path, dq_columns)
- dq = dq.rename(columns={"C_FORECAST_TIME": "C_TIME"})
- dq["C_TIME"] = dq["C_TIME"].apply(timestr_to_datetime)
- dq.set_index('C_TIME', inplace=True)
- dq = self.data_cleaning(dq)
- dq = self.drop_duplicated(dq)
- rp = self.read_data(rp_path, rp_columns)
- rp["C_TIME"] = rp["C_TIME"].apply(timestr_to_datetime)
- rp.set_index('C_TIME', inplace=True) # nan也可以设置索引列
- rp = self.data_cleaning(rp)
- rp = self.drop_duplicated(rp)
- df = self.tables_unite(rp, dq)
- dfs = self.missing_time_splite(df)
- dfs = [self.data_fill(df) for df in dfs]
- self.norm(dfs) # 归一化 待解决
- return dfs
- def norm(self, dfs):
- df = pd.concat(dfs, axis=0)
- df = df.reset_index()
- df["C_TIME"] = df["C_TIME"].apply(datetime_to_timestr)
- mean = np.mean(df, axis=0) # 数据的均值
- std = np.std(df, axis=0) # 标准差
- if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False:
- self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()})
- self.mean, self.std = mean.to_dict(), std.to_dict()
- def data_cleaning(self, data):
- data = data.replace(-99, np.nan)
- # nan 超过30% 删除
- data = data.dropna(axis=1, thresh=len(data)*0.7)
- # nan 替换成0 本周问题 1.卷积学习,0是否合适?
- data = data.replace(np.nan, 0)
- # 删除取值全部相同的列
- data = data.loc[:, (data != data.iloc[0]).any()]
- return data
- def missing_time_splite(self, df):
- dt = pd.Timedelta(minutes=15)
- day1 = pd.Timedelta(days=1)
- cnt = 0
- cnt1 = 0
- start_index = 0
- dfs = []
- for i in range(1, len(df)):
- if df.index[i] - df.index[i-1] >= day1:
- df_x = df.iloc[start_index:i, ]
- dfs.append(df_x)
- start_index = i
- cnt1 += 1
- if df.index[i] - df.index[i-1] != dt:
- print(df.index[i-1], end=" ~ ")
- print(df.index[i])
- cnt += 1
- dfs.append(df.iloc[start_index:, ])
- print("数据总数:", len(df), ",缺失段数:", cnt, "其中,超过一天的段数:", cnt1)
- return dfs
- def data_fill(self, df):
- df = df.resample('15T').bfill()
- return df
- def set_yml(self, yml_dict):
- with open(self.opt.config_yaml, 'r', encoding='utf-8') as f:
- cfg = yaml.safe_load(f)
- for k, v in yml_dict.items():
- cfg[k] = v
- with open(self.opt.config_yaml, 'w') as f:
- yaml.safe_dump(cfg, f, default_flow_style=False)
- def read_data(self, path, cols=None, index_col=None):
- init_data = pd.read_csv(path, usecols=cols, index_col=index_col)
- return init_data
- def filter_data(self):
- check_table = self.tables[:, 2] # 实际功率不能为0,为0代表没发电
- preserve_index = list(np.nonzero(check_table)[0])
- indexs = list(range(len(self.tables)))
- del_index = list(set(indexs) - set(preserve_index))
- self.tables = np.delete(self.tables, del_index, axis=0)
- return self.tables
- def drop_duplicated(self, df):
- df = df.groupby(level=0).mean() # DatetimeIndex时间索引去重
- return df
- def tables_unite(self, t1, t2):
- return pd.merge(t1, t2, left_index=True, right_index=True)
- if __name__ == "__main__":
- ds = DataSet()
- # dq = ds.read_data(dq_path, dq_columns)[0]
- # rp = ds.read_data(rp_path, rp_columns)[0]
- # # rp_average(rp) # 计算平均功率
- # envir = ds.read_data(envir_path, envir_columns)[0]
- # tables = ds.tables_integra(dq, rp, envir)
- # ds.tables_norm_result(tables)
|