data_process.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/17 10:10
  4. # file: main.py
  5. # author: David
  6. # company: shenyang JY
  7. import pandas as pd
  8. import os
  9. import numpy as np
  10. from data_utils import *
  11. import yaml
  12. class data_process(object):
  13. def __init__(self, opt):
  14. self.std = None
  15. self.mean = None
  16. self.opt = opt
  17. # 主要是联立后的补值操作
  18. def get_processed_data(self, test_list):
  19. """
  20. :param args:
  21. :return: 返回分割补值处理好的npy向量文件
  22. """
  23. csv_data_path = self.opt.csv_data_path
  24. nwp = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["nwp"]))
  25. cluster_power = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["cluter_power"]))
  26. cluster_power['C_TIME'] = pd.to_datetime(cluster_power['C_TIME'])
  27. cluster_power['C_TIME'] = cluster_power['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  28. # 第一步:联立
  29. unite = pd.merge(nwp, cluster_power, on='C_TIME')
  30. # 第二步:计算间隔
  31. unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
  32. unite['time_diff'] = unite['C_TIME'].diff()
  33. dt_short = pd.Timedelta(minutes=15)
  34. dt_long = pd.Timedelta(minutes=15 * 10)
  35. dfs = self.missing_time_splite(unite, dt_short, dt_long)
  36. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  37. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
  38. print("再次测算,需要插值的总点数为:", miss_number)
  39. dfs_train, dfs_test = self.data_fill(dfs, test_list)
  40. self.normalize(dfs_train) # 归一化
  41. return dfs_train, dfs_test
  42. def normalize(self, dfs):
  43. """
  44. 暂时不将C_TIME归一化
  45. :param dfs:
  46. :return:
  47. """
  48. df = pd.concat(dfs, axis=0)
  49. # df = df.reset_index()
  50. # df["C_TIME"] = df["C_TIME"].apply(datetime_to_timestr)
  51. mean = np.mean(df.iloc[1:, :], axis=0) # 数据的均值
  52. std = np.std(df.iloc[1:, :], axis=0) # 标准差
  53. # if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False:
  54. # self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()})
  55. print("归一化参数,均值为:{},方差为:{}".format(mean.to_dict(), std.to_dict()))
  56. self.mean, self.std = mean.to_dict(), std.to_dict()
  57. def missing_time_splite(self, df, dt_short, dt_long):
  58. n_long, n_short, n_points = 0, 0, 0
  59. start_index = 0
  60. dfs = []
  61. for i in range(1, len(df)):
  62. if df['time_diff'][i] >= dt_long:
  63. df_long = df.iloc[start_index:i, :-1]
  64. dfs.append(df_long)
  65. start_index = i
  66. n_long += 1
  67. if df['time_diff'][i] > dt_short:
  68. print(df['C_TIME'][i-1], end=" ~ ")
  69. print(df['C_TIME'][i], end=" ")
  70. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  71. print("缺失点数:", points)
  72. if df['time_diff'][i] < dt_long:
  73. n_short += 1
  74. n_points += points
  75. print("需要补值的点数:", points)
  76. dfs.append(df.iloc[start_index:, :-1])
  77. print("数据总数:", len(df), ",时序缺失的间隔:", n_short, "其中,较长的时间间隔:", n_long)
  78. print("需要补值的总点数:", n_points)
  79. return dfs[1:]
  80. def data_fill(self, dfs, test):
  81. dfs_train, dfs_test = [], []
  82. for i, df in enumerate(dfs):
  83. df1 = df.set_index('C_TIME', inplace=False)
  84. dff = df1.resample('15T').bfill()
  85. dff.reset_index(inplace=True)
  86. points = len(dff) - len(df1)
  87. if i not in test:
  88. if i == 0:
  89. dff = dff.iloc[8:, :].reset_index(drop=True)
  90. dfs_train.append(dff)
  91. print("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  92. else:
  93. print("{} ~ {} 有 {} 个点, 缺失 {} 个点.(测试集)".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  94. dfs_test.append(dfs[i].reset_index(drop=True))
  95. return dfs_train, dfs_test
  96. def set_yml(self, yml_dict):
  97. with open(self.opt.config_yaml, 'r', encoding='utf-8') as f:
  98. cfg = yaml.safe_load(f)
  99. for k, v in yml_dict.items():
  100. cfg[k] = v
  101. with open(self.opt.config_yaml, 'w') as f:
  102. yaml.safe_dump(cfg, f, default_flow_style=False)
  103. def drop_duplicated(self, df):
  104. df = df.groupby(level=0).mean() # DatetimeIndex时间索引去重
  105. return df
  106. if __name__ == "__main__":
  107. # dq = ds.read_data(dq_path, dq_columns)[0]
  108. # rp = ds.read_data(rp_path, rp_columns)[0]
  109. # # rp_average(rp) # 计算平均功率
  110. # envir = ds.read_data(envir_path, envir_columns)[0]
  111. # tables = ds.tables_integra(dq, rp, envir)
  112. # ds.tables_norm_result(tables)
  113. pass