data_process.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/17 10:10
  4. # file: main.py
  5. # author: David
  6. # company: shenyang JY
  7. import pandas as pd
  8. import numpy as np
  9. from data_utils import *
  10. import yaml
  11. class data_process(object):
  12. def __init__(self, opt):
  13. self.std = None
  14. self.mean = None
  15. self.opt = opt
  16. # 都是在ndarray量纲下进行计算
  17. # self.norm_data = (self.tables[:, 1:] - self.mean) / self.std # 归一化,去量纲
  18. # self.norm_data.insert(0, 'C_TIME', self.tables['C_TIME'])
  19. # self.set_yml({'mean': self.mean.to_dict(), 'std': self.std.to_dict()})
  20. # self.start_num_in_test = 0
  21. def get_processed_data(self):
  22. excel_data_path = self.opt.excel_data_path
  23. data_format = self.opt.data_format
  24. dq_path = excel_data_path + data_format["dq"]
  25. rp_path = excel_data_path + data_format["rp"]
  26. nwp_path = excel_data_path + data_format["nwp"]
  27. envir_path = excel_data_path + data_format["envir"]
  28. dq_columns = ['C_FORECAST_TIME', 'C_FP_VALUE']
  29. rp_columns = ['C_TIME', 'C_REAL_VALUE'] # 待优化 ["'C_TIME'", "'C_REAL_VALUE'"] 原因:csv 字符串是单引号'',read_csv带单引号
  30. nwp = self.read_data(nwp_path).loc[:, "C_PRE_TIME":] # 待优化 导出csv按照表的列顺序 read_csv按照csv列顺序读取
  31. nwp = self.data_cleaning(nwp)
  32. nwp.drop(['C_FARM_ID', 'C_SC_DATE', 'C_SC_TIME', 'C_PRE_DATE'], axis=1, inplace=True)
  33. nwp["C_PRE_TIME"] = nwp["C_PRE_TIME"].apply(timestr_to_datetime)
  34. nwp.rename({"C_PRE_TIME": "C_TIME"}, axis=1, inplace=True)
  35. nwp.set_index('C_TIME', inplace=True)
  36. nwp = self.drop_duplicated(nwp)
  37. envir = self.read_data(envir_path).loc[:, "C_TIME":] # 待优化 导出csv按照表的列顺序 read_csv按照csv列顺序读取
  38. envir = self.data_cleaning(envir)
  39. envir["C_TIME"] = envir["C_TIME"].apply(timestr_to_datetime)
  40. envir.set_index('C_TIME', inplace=True)
  41. envir = self.drop_duplicated(envir)
  42. rp = self.read_data(rp_path, rp_columns)
  43. rp["C_TIME"] = rp["C_TIME"].apply(timestr_to_datetime)
  44. rp.set_index('C_TIME', inplace=True) # nan也可以设置索引列
  45. rp = self.data_cleaning(rp)
  46. rp = self.drop_duplicated(rp)
  47. df = self.tables_unite(rp, envir)
  48. df = self.tables_unite(df, nwp)
  49. dfs = self.missing_time_splite(df)
  50. dfs = [self.data_fill(df) for df in dfs]
  51. self.norm(dfs) # 归一化 待解决
  52. return dfs
  53. def norm(self, dfs):
  54. df = pd.concat(dfs, axis=0)
  55. df = df.reset_index()
  56. df["C_TIME"] = df["C_TIME"].apply(datetime_to_timestr)
  57. mean = np.mean(df, axis=0) # 数据的均值
  58. std = np.std(df, axis=0) # 标准差
  59. if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False:
  60. self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()})
  61. print("归一化参数,均值为:{},方差为:{}".format(mean.to_dict(), std.to_dict()))
  62. self.mean, self.std = mean.to_dict(), std.to_dict()
  63. def data_cleaning(self, data):
  64. data = data.replace(-99, np.nan)
  65. # nan 超过30% 删除
  66. data = data.dropna(axis=1, thresh=len(data)*0.8)
  67. # 删除取值全部相同的列
  68. data = data.loc[:, (data != data.iloc[0]).any()]
  69. # nan 替换成0 本周问题 1.卷积学习,0是否合适?
  70. data = data.replace(np.nan, 0)
  71. return data
  72. def missing_time_splite(self, df):
  73. dt = pd.Timedelta(minutes=15)
  74. day1 = pd.Timedelta(days=1)
  75. cnt = 0
  76. cnt1 = 0
  77. start_index = 0
  78. dfs = []
  79. for i in range(1, len(df)):
  80. if df.index[i] - df.index[i-1] >= day1:
  81. df_x = df.iloc[start_index:i, ]
  82. dfs.append(df_x)
  83. start_index = i
  84. cnt1 += 1
  85. if df.index[i] - df.index[i-1] != dt:
  86. print(df.index[i-1], end=" ~ ")
  87. print(df.index[i])
  88. cnt += 1
  89. dfs.append(df.iloc[start_index:, ])
  90. print("数据总数:", len(df), ",缺失段数:", cnt, "其中,超过一天的段数:", cnt1)
  91. return dfs
  92. def data_fill(self, df):
  93. df = df.resample('15T').bfill()
  94. return df
  95. def set_yml(self, yml_dict):
  96. with open(self.opt.config_yaml, 'r', encoding='utf-8') as f:
  97. cfg = yaml.safe_load(f)
  98. for k, v in yml_dict.items():
  99. cfg[k] = v
  100. with open(self.opt.config_yaml, 'w') as f:
  101. yaml.safe_dump(cfg, f, default_flow_style=False)
  102. def read_data(self, path, cols=None, index_col=None):
  103. init_data = pd.read_csv(path, usecols=cols, index_col=index_col)
  104. return init_data
  105. def filter_data(self):
  106. check_table = self.tables[:, 2] # 实际功率不能为0,为0代表没发电
  107. preserve_index = list(np.nonzero(check_table)[0])
  108. indexs = list(range(len(self.tables)))
  109. del_index = list(set(indexs) - set(preserve_index))
  110. self.tables = np.delete(self.tables, del_index, axis=0)
  111. return self.tables
  112. def drop_duplicated(self, df):
  113. df = df.groupby(level=0).mean() # DatetimeIndex时间索引去重
  114. return df
  115. def tables_unite(self, t1, t2):
  116. return pd.merge(t1, t2, left_index=True, right_index=True)
  117. if __name__ == "__main__":
  118. ds = DataSet()
  119. # dq = ds.read_data(dq_path, dq_columns)[0]
  120. # rp = ds.read_data(rp_path, rp_columns)[0]
  121. # # rp_average(rp) # 计算平均功率
  122. # envir = ds.read_data(envir_path, envir_columns)[0]
  123. # tables = ds.tables_integra(dq, rp, envir)
  124. # ds.tables_norm_result(tables)