#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/3/17 10:10 # file: main.py # author: David # company: shenyang JY import pandas as pd import os import numpy as np from data_utils import * import yaml class data_process(object): def __init__(self, opt): self.std = None self.mean = None self.opt = opt # 主要是联立后的补值操作 def get_processed_data(self, *args): """ :param args: :return: 返回分割补值处理好的npy向量文件 """ csv_data_path = self.opt.csv_data_path nwp = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["nwp"])) cluster_power = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["cluter_power"])) # 第一步:联立 unite = pd.merge(nwp, cluster_power, on='C_TIME') # 第二步:计算间隔 unite['C_TIME'] = pd.to_datetime(unite['C_TIME']) unite['time_diff'] = unite['C_TIME'].diff() dt_short = pd.Timedelta(minutes=15) dt_long = pd.Timedelta(minutes=15 * 10) dfs = self.missing_time_splite(unite, dt_short, dt_long) miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)] miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points) print("再次测算,需要插值的总点数为:", miss_number) dfs_train, dfs_test = self.data_fill(dfs, [8]) self.normalize(dfs_train) # 归一化 return dfs_train, dfs_test def normalize(self, dfs): """ 暂时不将C_TIME归一化 :param dfs: :return: """ df = pd.concat(dfs, axis=0) # df = df.reset_index() # df["C_TIME"] = df["C_TIME"].apply(datetime_to_timestr) mean = np.mean(df.iloc[1:, :], axis=0) # 数据的均值 std = np.std(df.iloc[1:, :], axis=0) # 标准差 # if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False: # self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()}) print("归一化参数,均值为:{},方差为:{}".format(mean.to_dict(), std.to_dict())) self.mean, self.std = mean.to_dict(), std.to_dict() def missing_time_splite(self, df, dt_short, dt_long): n_long, n_short, n_points = 0, 0, 0 start_index = 0 dfs = [] for i in range(1, len(df)): if df['time_diff'][i] >= dt_long: df_long = df.iloc[start_index:i, :-1] dfs.append(df_long) start_index = i n_long += 1 if df['time_diff'][i] > dt_short: print(df['C_TIME'][i-1], end=" ~ ") print(df['C_TIME'][i], end=" ") points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1 print("缺失点数:", points) if df['time_diff'][i] < dt_long: n_short += 1 n_points += points print("需要补值的点数:", points) dfs.append(df.iloc[start_index:, :-1]) print("数据总数:", len(df), ",时序缺失的间隔:", n_short, "其中,较长的时间间隔:", n_long) print("需要补值的总点数:", n_points) return dfs def data_fill(self, dfs, test): dfs_train = [] for i, df in enumerate(dfs): df.set_index('C_TIME', inplace=True) dff = df.resample('15T').bfill() dff.reset_index(inplace=True) points = len(dff) - len(df) if i not in test: dfs_train.append(dff) print("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points)) else: print("{} ~ {} 有 {} 个点, 缺失 {} 个点.".format(df.index[0], df.index[-1], len(dff), points)) dfs_test = [dfs[t] for t in test] return dfs_train, dfs_test def set_yml(self, yml_dict): with open(self.opt.config_yaml, 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) for k, v in yml_dict.items(): cfg[k] = v with open(self.opt.config_yaml, 'w') as f: yaml.safe_dump(cfg, f, default_flow_style=False) def drop_duplicated(self, df): df = df.groupby(level=0).mean() # DatetimeIndex时间索引去重 return df if __name__ == "__main__": # dq = ds.read_data(dq_path, dq_columns)[0] # rp = ds.read_data(rp_path, rp_columns)[0] # # rp_average(rp) # 计算平均功率 # envir = ds.read_data(envir_path, envir_columns)[0] # tables = ds.tables_integra(dq, rp, envir) # ds.tables_norm_result(tables) pass