liudawei 11 months ago
commit
9048258b97
1 changed files with 95 additions and 0 deletions
  1. 95 0
      data_fill.py

+ 95 - 0
data_fill.py

@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2024/5/6 13:52
+# file: data_process.py
+# author: David
+# company: shenyang JY
+import os
+import numpy as np
+import pandas as pd
+from data_cleaning import rm_duplicated
+np.random.seed(42)
+
+
+class DataProcess(object):
+    def __init__(self, opt):
+        self.opt = opt
+
+    def get_train_data(self, unite):
+        # 第一步:计算间隔
+        unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
+        unite['time_diff'] = unite['C_TIME'].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
+        data_train = self.missing_time_splite(unite, dt_short, dt_long)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
+        print("再次测算,需要插值的总点数为:{}".format(miss_number))
+        # 第二步:插值
+        if miss_number > 0 and self.opt.Model["train_data_fill"]:
+            data_train = self.data_fill(data_train)
+        return data_train
+
+    def get_test_data(self, unite):
+        # 第一步:计算间隔
+        unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
+        unite['time_diff'] = unite['C_TIME'].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
+        data_test = self.missing_time_splite(unite, dt_short, dt_long)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
+        print("再次测算,需要插值的总点数为:{}".format(miss_number))
+        # 第二步:插值
+        if self.opt.Model["predict_data_fill"] and miss_number > 0:
+            data_test = self.data_fill(data_test, test=True)
+        return data_test
+
+    def missing_time_splite(self, df, dt_short, dt_long):
+        """
+        分割方法
+        dt_short: 数据时间频率(15min一个点)
+        dt_long: 小于dt_long时长进行补值,大于dt_long时长进行分割
+        """
+        n_long, n_short, n_points = 0, 0, 0
+        start_index = 0
+        dfs = []
+        for i in range(1, len(df)):
+            if df['time_diff'][i] >= dt_long:
+                df_long = df.iloc[start_index:i, :-1]
+                dfs.append(df_long)
+                start_index = i
+                n_long += 1
+            if df['time_diff'][i] > dt_short:
+                self.logger.info(f"{df['C_TIME'][i-1]} ~ {df['C_TIME'][i]}")
+                points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
+                self.logger.info("缺失点数:{}".format(points))
+                if df['time_diff'][i] < dt_long:
+                    n_short += 1
+                    n_points += points
+                    self.logger.info("需要补值的点数:{}".format(points))
+        dfs.append(df.iloc[start_index:, :-1])
+        print(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
+        print("需要补值的总点数:{}".format(n_points))
+        return dfs
+
+    def data_fill(self, dfs, test=False):
+        """
+        补值方法
+        dfs:待补值的dataframe集合
+        test:训练集/测试集标识
+        """
+        dfs_fill, inserts = [], 0
+        for i, df in enumerate(dfs):
+            df = rm_duplicated(df)
+            df1 = df.set_index('C_TIME', inplace=False)
+            # 补值方法可选
+            dff = df1.resample('15T').bfill()
+            dff.reset_index(inplace=True)
+            points = len(dff) - len(df1)
+            dfs_fill.append(dff)
+            self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
+            inserts += points
+        name = "预测数据" if test is True else "训练集"
+        print("{}分成了{}段".format(name, len(dfs_fill)))
+        return dfs_fill