#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2024/5/6 13:52 # file: data_process.py # author: David # company: shenyang JY import os import numpy as np import pandas as pd from data_cleaning import rm_duplicated np.random.seed(42) class DataProcess(object): def __init__(self, opt): self.opt = opt def get_train_data(self, unite): # 第一步:计算间隔 unite['C_TIME'] = pd.to_datetime(unite['C_TIME']) unite['time_diff'] = unite['C_TIME'].diff() dt_short = pd.Timedelta(minutes=15) dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill']) data_train = self.missing_time_splite(unite, dt_short, dt_long) miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)] miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points) print("再次测算,需要插值的总点数为:{}".format(miss_number)) # 第二步:插值 if miss_number > 0 and self.opt.Model["train_data_fill"]: data_train = self.data_fill(data_train) return data_train def get_test_data(self, unite): # 第一步:计算间隔 unite['C_TIME'] = pd.to_datetime(unite['C_TIME']) unite['time_diff'] = unite['C_TIME'].diff() dt_short = pd.Timedelta(minutes=15) dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill']) data_test = self.missing_time_splite(unite, dt_short, dt_long) miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)] miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points) print("再次测算,需要插值的总点数为:{}".format(miss_number)) # 第二步:插值 if self.opt.Model["predict_data_fill"] and miss_number > 0: data_test = self.data_fill(data_test, test=True) return data_test def missing_time_splite(self, df, dt_short, dt_long): """ 分割方法 dt_short: 数据时间频率(15min一个点) dt_long: 小于dt_long时长进行补值,大于dt_long时长进行分割 """ n_long, n_short, n_points = 0, 0, 0 start_index = 0 dfs = [] for i in range(1, len(df)): if df['time_diff'][i] >= dt_long: df_long = df.iloc[start_index:i, :-1] dfs.append(df_long) start_index = i n_long += 1 if df['time_diff'][i] > dt_short: self.logger.info(f"{df['C_TIME'][i-1]} ~ {df['C_TIME'][i]}") points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1 self.logger.info("缺失点数:{}".format(points)) if df['time_diff'][i] < dt_long: n_short += 1 n_points += points self.logger.info("需要补值的点数:{}".format(points)) dfs.append(df.iloc[start_index:, :-1]) print(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}") print("需要补值的总点数:{}".format(n_points)) return dfs def data_fill(self, dfs, test=False): """ 补值方法 dfs:待补值的dataframe集合 test:训练集/测试集标识 """ dfs_fill, inserts = [], 0 for i, df in enumerate(dfs): df = rm_duplicated(df) df1 = df.set_index('C_TIME', inplace=False) # 补值方法可选 dff = df1.resample('15T').bfill() dff.reset_index(inplace=True) points = len(dff) - len(df1) dfs_fill.append(dff) self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points)) inserts += points name = "预测数据" if test is True else "训练集" print("{}分成了{}段".format(name, len(dfs_fill))) return dfs_fill