data_fill.py 4.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2024/5/6 13:52
  4. # file: data_process.py
  5. # author: David
  6. # company: shenyang JY
  7. import os
  8. import numpy as np
  9. import pandas as pd
  10. from data_cleaning import rm_duplicated
  11. np.random.seed(42)
  12. class DataProcess(object):
  13. def __init__(self, opt):
  14. self.opt = opt
  15. def get_train_data(self, unite):
  16. # 第一步:计算间隔
  17. unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
  18. unite['time_diff'] = unite['C_TIME'].diff()
  19. dt_short = pd.Timedelta(minutes=15)
  20. dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
  21. data_train = self.missing_time_splite(unite, dt_short, dt_long)
  22. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  23. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
  24. print("再次测算,需要插值的总点数为:{}".format(miss_number))
  25. # 第二步:插值
  26. if miss_number > 0 and self.opt.Model["train_data_fill"]:
  27. data_train = self.data_fill(data_train)
  28. return data_train
  29. def get_test_data(self, unite):
  30. # 第一步:计算间隔
  31. unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
  32. unite['time_diff'] = unite['C_TIME'].diff()
  33. dt_short = pd.Timedelta(minutes=15)
  34. dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
  35. data_test = self.missing_time_splite(unite, dt_short, dt_long)
  36. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  37. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
  38. print("再次测算,需要插值的总点数为:{}".format(miss_number))
  39. # 第二步:插值
  40. if self.opt.Model["predict_data_fill"] and miss_number > 0:
  41. data_test = self.data_fill(data_test, test=True)
  42. return data_test
  43. def missing_time_splite(self, df, dt_short, dt_long):
  44. """
  45. 分割方法
  46. dt_short: 数据时间频率(15min一个点)
  47. dt_long: 小于dt_long时长进行补值,大于dt_long时长进行分割
  48. """
  49. n_long, n_short, n_points = 0, 0, 0
  50. start_index = 0
  51. dfs = []
  52. for i in range(1, len(df)):
  53. if df['time_diff'][i] >= dt_long:
  54. df_long = df.iloc[start_index:i, :-1]
  55. dfs.append(df_long)
  56. start_index = i
  57. n_long += 1
  58. if df['time_diff'][i] > dt_short:
  59. self.logger.info(f"{df['C_TIME'][i-1]} ~ {df['C_TIME'][i]}")
  60. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  61. self.logger.info("缺失点数:{}".format(points))
  62. if df['time_diff'][i] < dt_long:
  63. n_short += 1
  64. n_points += points
  65. self.logger.info("需要补值的点数:{}".format(points))
  66. dfs.append(df.iloc[start_index:, :-1])
  67. print(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
  68. print("需要补值的总点数:{}".format(n_points))
  69. return dfs
  70. def data_fill(self, dfs, test=False):
  71. """
  72. 补值方法
  73. dfs:待补值的dataframe集合
  74. test:训练集/测试集标识
  75. """
  76. dfs_fill, inserts = [], 0
  77. for i, df in enumerate(dfs):
  78. df = rm_duplicated(df)
  79. df1 = df.set_index('C_TIME', inplace=False)
  80. # 补值方法可选
  81. dff = df1.resample('15T').bfill()
  82. dff.reset_index(inplace=True)
  83. points = len(dff) - len(df1)
  84. dfs_fill.append(dff)
  85. self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  86. inserts += points
  87. name = "预测数据" if test is True else "训练集"
  88. print("{}分成了{}段".format(name, len(dfs_fill)))
  89. return dfs_fill