data_process.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/17 10:10
  4. # file: main.py
  5. # author: David
  6. # company: shenyang JY
  7. import pandas as pd
  8. import numpy as np
  9. from data_utils import *
  10. import yaml
  11. class data_process(object):
  12. def __init__(self, opt):
  13. self.std = None
  14. self.mean = None
  15. self.opt = opt
  16. # 都是在ndarray量纲下进行计算
  17. # self.norm_data = (self.tables[:, 1:] - self.mean) / self.std # 归一化,去量纲
  18. # self.norm_data.insert(0, 'C_TIME', self.tables['C_TIME'])
  19. # self.set_yml({'mean': self.mean.to_dict(), 'std': self.std.to_dict()})
  20. # self.start_num_in_test = 0
  21. def get_processed_data(self):
  22. excel_data_path = self.opt.excel_data_path
  23. data_format = self.opt.data_format
  24. dq_path = excel_data_path + data_format["dq"]
  25. rp_path = excel_data_path + data_format["rp"]
  26. envir_path = excel_data_path + data_format["envir"]
  27. dq_columns = ['C_FORECAST_TIME', 'C_FP_VALUE']
  28. rp_columns = ['C_TIME', 'C_REAL_VALUE'] # 待优化 ["'C_TIME'", "'C_REAL_VALUE'"] 原因:csv 字符串是单引号'',read_csv带单引号
  29. # envir = self.read_data(envir_path).loc[:, "C_TIME":] # 待优化 导出csv按照表的列顺序 read_csv按照csv列顺序读取
  30. # envir = self.data_cleaning(envir)
  31. # envir["C_TIME"] = envir["C_TIME"].apply(timestr_to_datetime)
  32. # envir.set_index('C_TIME', inplace=True)
  33. # envir = self.drop_duplicated(envir)
  34. # 读取的df,经过时序转换、清洗、去重三部曲
  35. dq = self.read_data(dq_path, dq_columns)
  36. dq = dq.rename(columns={"C_FORECAST_TIME": "C_TIME"})
  37. dq["C_TIME"] = dq["C_TIME"].apply(timestr_to_datetime)
  38. dq.set_index('C_TIME', inplace=True)
  39. dq = self.data_cleaning(dq)
  40. dq = self.drop_duplicated(dq)
  41. rp = self.read_data(rp_path, rp_columns)
  42. rp["C_TIME"] = rp["C_TIME"].apply(timestr_to_datetime)
  43. rp.set_index('C_TIME', inplace=True) # nan也可以设置索引列
  44. rp = self.data_cleaning(rp)
  45. rp = self.drop_duplicated(rp)
  46. df = self.tables_unite(rp, dq)
  47. dfs = self.missing_time_splite(df)
  48. dfs = [self.data_fill(df) for df in dfs]
  49. self.norm(dfs) # 归一化 待解决
  50. return dfs
  51. def norm(self, dfs):
  52. df = pd.concat(dfs, axis=0)
  53. df = df.reset_index()
  54. df["C_TIME"] = df["C_TIME"].apply(datetime_to_timestr)
  55. mean = np.mean(df, axis=0) # 数据的均值
  56. std = np.std(df, axis=0) # 标准差
  57. if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False:
  58. self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()})
  59. self.mean, self.std = mean.to_dict(), std.to_dict()
  60. def data_cleaning(self, data):
  61. data = data.replace(-99, np.nan)
  62. # nan 超过30% 删除
  63. data = data.dropna(axis=1, thresh=len(data)*0.7)
  64. # nan 替换成0 本周问题 1.卷积学习,0是否合适?
  65. data = data.replace(np.nan, 0)
  66. # 删除取值全部相同的列
  67. data = data.loc[:, (data != data.iloc[0]).any()]
  68. return data
  69. def missing_time_splite(self, df):
  70. dt = pd.Timedelta(minutes=15)
  71. day1 = pd.Timedelta(days=1)
  72. cnt = 0
  73. cnt1 = 0
  74. start_index = 0
  75. dfs = []
  76. for i in range(1, len(df)):
  77. if df.index[i] - df.index[i-1] >= day1:
  78. df_x = df.iloc[start_index:i, ]
  79. dfs.append(df_x)
  80. start_index = i
  81. cnt1 += 1
  82. if df.index[i] - df.index[i-1] != dt:
  83. print(df.index[i-1], end=" ~ ")
  84. print(df.index[i])
  85. cnt += 1
  86. dfs.append(df.iloc[start_index:, ])
  87. print("数据总数:", len(df), ",缺失段数:", cnt, "其中,超过一天的段数:", cnt1)
  88. return dfs
  89. def data_fill(self, df):
  90. df = df.resample('15T').bfill()
  91. return df
  92. def set_yml(self, yml_dict):
  93. with open(self.opt.config_yaml, 'r', encoding='utf-8') as f:
  94. cfg = yaml.safe_load(f)
  95. for k, v in yml_dict.items():
  96. cfg[k] = v
  97. with open(self.opt.config_yaml, 'w') as f:
  98. yaml.safe_dump(cfg, f, default_flow_style=False)
  99. def read_data(self, path, cols=None, index_col=None):
  100. init_data = pd.read_csv(path, usecols=cols, index_col=index_col)
  101. return init_data
  102. def filter_data(self):
  103. check_table = self.tables[:, 2] # 实际功率不能为0,为0代表没发电
  104. preserve_index = list(np.nonzero(check_table)[0])
  105. indexs = list(range(len(self.tables)))
  106. del_index = list(set(indexs) - set(preserve_index))
  107. self.tables = np.delete(self.tables, del_index, axis=0)
  108. return self.tables
  109. def drop_duplicated(self, df):
  110. df = df.groupby(level=0).mean() # DatetimeIndex时间索引去重
  111. return df
  112. def tables_unite(self, t1, t2):
  113. return pd.merge(t1, t2, left_index=True, right_index=True)
  114. if __name__ == "__main__":
  115. ds = DataSet()
  116. # dq = ds.read_data(dq_path, dq_columns)[0]
  117. # rp = ds.read_data(rp_path, rp_columns)[0]
  118. # # rp_average(rp) # 计算平均功率
  119. # envir = ds.read_data(envir_path, envir_columns)[0]
  120. # tables = ds.tables_integra(dq, rp, envir)
  121. # ds.tables_norm_result(tables)