data_process.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/17 10:10
  4. # file: main.py
  5. # author: David
  6. # company: shenyang JY
  7. import pandas as pd
  8. import os
  9. import numpy as np
  10. from data_utils import *
  11. import yaml
  12. class data_process(object):
  13. def __init__(self, opt):
  14. self.std = None
  15. self.mean = None
  16. self.opt = opt
  17. # 主要是联立后的补值操作
  18. def get_processed_data(self, *args):
  19. """
  20. :param args:
  21. :return: 返回分割补值处理好的npy向量文件
  22. """
  23. csv_data_path = self.opt.csv_data_path
  24. nwp = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["nwp"]))
  25. cluster_power = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["cluter_power"]))
  26. # 第一步:联立
  27. unite = pd.merge(nwp, cluster_power, on='C_TIME')
  28. # 第二步:计算间隔
  29. unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
  30. unite['time_diff'] = unite['C_TIME'].diff()
  31. dt_short = pd.Timedelta(minutes=15)
  32. dt_long = pd.Timedelta(minutes=15 * 10)
  33. dfs = self.missing_time_splite(unite, dt_short, dt_long)
  34. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  35. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
  36. print("再次测算,需要插值的总点数为:", miss_number)
  37. dfs_train, dfs_test = self.data_fill(dfs, [8])
  38. self.normalize(dfs_train) # 归一化
  39. return dfs_train, dfs_test
  40. def normalize(self, dfs):
  41. """
  42. 暂时不将C_TIME归一化
  43. :param dfs:
  44. :return:
  45. """
  46. df = pd.concat(dfs, axis=0)
  47. # df = df.reset_index()
  48. # df["C_TIME"] = df["C_TIME"].apply(datetime_to_timestr)
  49. mean = np.mean(df.iloc[1:, :], axis=0) # 数据的均值
  50. std = np.std(df.iloc[1:, :], axis=0) # 标准差
  51. # if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False:
  52. # self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()})
  53. print("归一化参数,均值为:{},方差为:{}".format(mean.to_dict(), std.to_dict()))
  54. self.mean, self.std = mean.to_dict(), std.to_dict()
  55. def missing_time_splite(self, df, dt_short, dt_long):
  56. n_long, n_short, n_points = 0, 0, 0
  57. start_index = 0
  58. dfs = []
  59. for i in range(1, len(df)):
  60. if df['time_diff'][i] >= dt_long:
  61. df_long = df.iloc[start_index:i, :-1]
  62. dfs.append(df_long)
  63. start_index = i
  64. n_long += 1
  65. if df['time_diff'][i] > dt_short:
  66. print(df['C_TIME'][i-1], end=" ~ ")
  67. print(df['C_TIME'][i], end=" ")
  68. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  69. print("缺失点数:", points)
  70. if df['time_diff'][i] < dt_long:
  71. n_short += 1
  72. n_points += points
  73. print("需要补值的点数:", points)
  74. dfs.append(df.iloc[start_index:, :-1])
  75. print("数据总数:", len(df), ",时序缺失的间隔:", n_short, "其中,较长的时间间隔:", n_long)
  76. print("需要补值的总点数:", n_points)
  77. return dfs
  78. def data_fill(self, dfs, test):
  79. dfs_train = []
  80. for i, df in enumerate(dfs):
  81. df.set_index('C_TIME', inplace=True)
  82. dff = df.resample('15T').bfill()
  83. dff.reset_index(inplace=True)
  84. points = len(dff) - len(df)
  85. if i not in test:
  86. dfs_train.append(dff)
  87. print("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  88. else:
  89. print("{} ~ {} 有 {} 个点, 缺失 {} 个点.".format(df.index[0], df.index[-1], len(dff), points))
  90. dfs_test = [dfs[t] for t in test]
  91. return dfs_train, dfs_test
  92. def set_yml(self, yml_dict):
  93. with open(self.opt.config_yaml, 'r', encoding='utf-8') as f:
  94. cfg = yaml.safe_load(f)
  95. for k, v in yml_dict.items():
  96. cfg[k] = v
  97. with open(self.opt.config_yaml, 'w') as f:
  98. yaml.safe_dump(cfg, f, default_flow_style=False)
  99. def drop_duplicated(self, df):
  100. df = df.groupby(level=0).mean() # DatetimeIndex时间索引去重
  101. return df
  102. if __name__ == "__main__":
  103. # dq = ds.read_data(dq_path, dq_columns)[0]
  104. # rp = ds.read_data(rp_path, rp_columns)[0]
  105. # # rp_average(rp) # 计算平均功率
  106. # envir = ds.read_data(envir_path, envir_columns)[0]
  107. # tables = ds.tables_integra(dq, rp, envir)
  108. # ds.tables_norm_result(tables)
  109. pass