123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/3/17 10:10
- # file: main.py
- # author: David
- # company: shenyang JY
- import pandas as pd
- import os
- import numpy as np
- from data_utils import *
- import yaml
- class data_process(object):
- def __init__(self, opt):
- self.std = None
- self.mean = None
- self.opt = opt
- # 主要是联立后的补值操作
- def get_processed_data(self, test_list):
- """
- :param args:
- :return: 返回分割补值处理好的npy向量文件
- """
- csv_data_path = self.opt.csv_data_path
- nwp = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["nwp"]))
- cluster_power = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["cluter_power"]))
- cluster_power['C_TIME'] = pd.to_datetime(cluster_power['C_TIME'])
- cluster_power['C_TIME'] = cluster_power['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
- # 第一步:联立
- unite = pd.merge(nwp, cluster_power, on='C_TIME')
- # 第二步:计算间隔
- unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
- unite['time_diff'] = unite['C_TIME'].diff()
- dt_short = pd.Timedelta(minutes=15)
- dt_long = pd.Timedelta(minutes=15 * 10)
- dfs = self.missing_time_splite(unite, dt_short, dt_long)
- miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
- miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
- print("再次测算,需要插值的总点数为:", miss_number)
- dfs_train, dfs_test = self.data_fill(dfs, test_list)
- self.normalize(dfs_train) # 归一化
- return dfs_train, dfs_test
- def normalize(self, dfs):
- """
- 暂时不将C_TIME归一化
- :param dfs:
- :return:
- """
- df = pd.concat(dfs, axis=0)
- # df = df.reset_index()
- # df["C_TIME"] = df["C_TIME"].apply(datetime_to_timestr)
- mean = np.mean(df.iloc[1:, :], axis=0) # 数据的均值
- std = np.std(df.iloc[1:, :], axis=0) # 标准差
- # if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False:
- # self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()})
- print("归一化参数,均值为:{},方差为:{}".format(mean.to_dict(), std.to_dict()))
- self.mean, self.std = mean.to_dict(), std.to_dict()
- def missing_time_splite(self, df, dt_short, dt_long):
- n_long, n_short, n_points = 0, 0, 0
- start_index = 0
- dfs = []
- for i in range(1, len(df)):
- if df['time_diff'][i] >= dt_long:
- df_long = df.iloc[start_index:i, :-1]
- dfs.append(df_long)
- start_index = i
- n_long += 1
- if df['time_diff'][i] > dt_short:
- print(df['C_TIME'][i-1], end=" ~ ")
- print(df['C_TIME'][i], end=" ")
- points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
- print("缺失点数:", points)
- if df['time_diff'][i] < dt_long:
- n_short += 1
- n_points += points
- print("需要补值的点数:", points)
- dfs.append(df.iloc[start_index:, :-1])
- print("数据总数:", len(df), ",时序缺失的间隔:", n_short, "其中,较长的时间间隔:", n_long)
- print("需要补值的总点数:", n_points)
- return dfs[1:]
- def data_fill(self, dfs, test):
- dfs_train, dfs_test = [], []
- for i, df in enumerate(dfs):
- df1 = df.set_index('C_TIME', inplace=False)
- dff = df1.resample('15T').bfill()
- dff.reset_index(inplace=True)
- points = len(dff) - len(df1)
- if i not in test:
- if i == 0:
- dff = dff.iloc[8:, :].reset_index(drop=True)
- dfs_train.append(dff)
- print("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
- else:
- print("{} ~ {} 有 {} 个点, 缺失 {} 个点.(测试集)".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
- dfs_test.append(dfs[i].reset_index(drop=True))
- return dfs_train, dfs_test
- def set_yml(self, yml_dict):
- with open(self.opt.config_yaml, 'r', encoding='utf-8') as f:
- cfg = yaml.safe_load(f)
- for k, v in yml_dict.items():
- cfg[k] = v
- with open(self.opt.config_yaml, 'w') as f:
- yaml.safe_dump(cfg, f, default_flow_style=False)
- def drop_duplicated(self, df):
- df = df.groupby(level=0).mean() # DatetimeIndex时间索引去重
- return df
- if __name__ == "__main__":
- # dq = ds.read_data(dq_path, dq_columns)[0]
- # rp = ds.read_data(rp_path, rp_columns)[0]
- # # rp_average(rp) # 计算平均功率
- # envir = ds.read_data(envir_path, envir_columns)[0]
- # tables = ds.tables_integra(dq, rp, envir)
- # ds.tables_norm_result(tables)
- pass
|