#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/3/31 11:14 # file: requestHandler.py # author: David # company: shenyang JY import pandas as pd import numpy as np from sqlalchemy.dialects.postgresql.psycopg2 import logger from cache.inputData import dt_tag from cache.data_cleaning import cleaning, key_field_row_cleaning, rm_duplicated from validate import ValidationError class requestHandler(object): history_rp = None env = None def __init__(self, log, args): self.logger = log self.args = args self.opt = args.parse_args_and_yaml() def get_form_data(self, req): data = req.form.get("data") if data is None: raise ValidationError("data为空") # 这个eval()函数会验证字符串是否转换成字典 字段为Null,传回来的是什么格式? # {"字段": null} {"字段": } 都不会通过验证 jata = eval(data.replace("\t", "")) nwp = pd.json_normalize(jata, record_path=["nwp"]) dq = pd.json_normalize(jata, record_path=["dq"]) history_dq = pd.json_normalize(jata, record_path=["history_dq"]) history_rp1 = pd.json_normalize(jata, record_path=["history_rp"]) env1 = pd.json_normalize(jata, record_path=["env"]) nwp['C_TIME'] = pd.to_datetime(nwp['C_TIME']) nwp['DT_TAG'] = nwp.apply(lambda x: dt_tag(x['C_TIME']), axis=1) dq['C_TIME'] = pd.to_datetime(dq['C_TIME']) history_dq['C_TIME'] = pd.to_datetime(history_dq['C_TIME']) history_rp1['C_TIME'] = pd.to_datetime(history_rp1['C_TIME']) history_rp1 = rm_duplicated(history_rp1, self.logger) env1['C_TIME'] = pd.to_datetime(env1['C_TIME']) self.logger.info("-----历史数据长度,实际功率:{},环境监测仪:{}-----".format(len(history_rp1), len(env1))) self.logger.info("历史数据最后一个点,实际功率:{},环境监测仪:{}".format(history_rp1.iloc[-1, history_rp1.columns.get_loc('C_TIME')], env1.iloc[-1, env1.columns.get_loc('C_TIME')])) # 1. 新增 if len(history_rp1) == 1 and requestHandler.history_rp is not None: if requestHandler.history_rp.iloc[-1, requestHandler.history_rp.columns.get_loc('C_TIME')] < history_rp1.iloc[0, history_rp1.columns.get_loc('C_TIME')]: self.logger.info("合并前,实际功率:{}".format(len(requestHandler.history_rp))) requestHandler.history_rp = pd.concat([requestHandler.history_rp, history_rp1]).reset_index(drop=True) if requestHandler.env.iloc[-1, requestHandler.env.columns.get_loc('C_TIME')] < env1.iloc[0, env1.columns.get_loc('C_TIME')]: self.logger.info("合并前,环境监测仪:{}".format(len(requestHandler.env))) requestHandler.env = pd.concat([requestHandler.env, env1]).reset_index(drop=True) self.logger.info("新增方法:实际功率:{},环境监测仪:{}".format(len(requestHandler.history_rp), len(requestHandler.env))) # 2. 重更 或者 初始 else: requestHandler.history_rp = history_rp1 requestHandler.env = env1 self.logger.info("重更或初始方法:实际功率:{},测风塔:{}".format(len(requestHandler.history_rp), len(requestHandler.env))) # 3. 取测风塔均值、更新缓存 env_columns = [ele for ele in self.opt.env_columns if ele not in ['C_TIME', 'C_FP_VALUE', 'C_REAL_VALUE', 'error']] env_columns = env_columns if len(env_columns) > 0 else [self.opt.usable_power['env']] env_ave = self.his_data_ave(requestHandler.env, env_columns) rp_inst = self.his_data_inst(requestHandler.history_rp, ['C_REAL_VALUE', 'C_ABLE_VALUE', 'LIMIT_STATUS']) return history_dq, rp_inst, env_ave, nwp, dq def his_data_ave(self, his, cols): # 历史数据最后一个点,往前推history_env_hours个小时的点数,不够返回errorCode=3,以测风塔数据取15分钟间隔的左边界为准 # 1. 先进行数据清洗和特征筛选 self.logger.info("气象站处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')], his.iloc[-1, his.columns.get_loc('C_TIME')])) if self.opt.Model["fusion"] is False: his = his.replace(-99, np.nan) his.set_index('C_TIME', inplace=True) his = his.fillna(method='ffill') his = his.fillna(method='bfill') his.reset_index(drop=False, inplace=True) his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger) if not his_clean.empty: self.logger.info("气象站清洗后时间范围:{}-{}".format(his_clean.iloc[0, his_clean.columns.get_loc('C_TIME')], his_clean.iloc[-1, his_clean.columns.get_loc('C_TIME')])) his_filter = his_clean[['C_TIME']+cols] # 2. 取时刻范围均值,都没有返回-99 his_ave15 = his_filter.resample('15T', on='C_TIME', label='left').mean().reset_index() self.logger.info("气象站重采样时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')])) hours = self.opt.Model["his_points"] his_ave15 = his_ave15.tail(hours) self.logger.info("气象站处理后时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')])) self.logger.info("更新缓存前,cls.env第一个点时间:{}".format(requestHandler.env.iloc[0, requestHandler.env.columns.get_loc('C_TIME')])) requestHandler.update_cache_env(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')]) self.logger.info("更新缓存后,cls.env第一个点时间:{}".format(requestHandler.env.iloc[0, requestHandler.env.columns.get_loc('C_TIME')])) self.logger.info("清洗后不为空,气象站:{}".format(len(his_ave15))) return his_ave15 else: return his_clean def rp_data_ave(self, his, cols): self.logger.info("实际功率处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')], his.iloc[-1, his.columns.get_loc('C_TIME')])) his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger) if not his_clean.empty: his_ave15 = his_clean.resample('15T', on='C_TIME', label='left').mean().reset_index() his_ave15['LIMIT_STATUS'] = his_ave15['LIMIT_STATUS'].apply(lambda x: x if x in [0, 1] else 1) hours = self.opt.Model["his_points"] his_ave15 = his_ave15.tail(hours) self.logger.info("实际功率处理后时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')])) requestHandler.update_cache_rp(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')]) return his_ave15 else: return his_clean def his_data_inst(self, his, cols): self.logger.info("实际功率处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')], his.iloc[-1, his.columns.get_loc('C_TIME')])) # 先进行数据清洗和特征筛选 his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger) if not his_clean.empty: # 创建一个新列来存储分钟数 his_clean['minute'] = his_clean['C_TIME'].dt.minute # 应用这个函数来找到每行最接近的目标分钟数 his_clean['closest_target_minute'] = his_clean['minute'].apply(self.closest_minute) his_clean['C_TIME'] = his_clean.apply(lambda x: x['C_TIME'].replace(minute=x['closest_target_minute']), axis=1) his_inst = his_clean.groupby('C_TIME').first().reset_index(drop=False) his_filter = his_inst[['C_TIME'] + cols] # self.logger.info(his_filter) his_filter.set_index('C_TIME', inplace=True) resample = pd.date_range(start=his_filter.index.min(), end=his_filter.index.max(), freq='15T', name='C_TIME') his_filter = his_filter.reindex(resample, fill_value=np.nan).reset_index(drop=False) # self.logger.info(his_filter) hours = self.opt.Model["his_points"] his_inst = his_filter.tail(hours) self.logger.info( "实际功率处理后时间范围:{}-{}".format(his_inst.iloc[0, his_inst.columns.get_loc('C_TIME')], his_inst.iloc[-1, his_inst.columns.get_loc('C_TIME')])) # self.logger.info(his_inst) requestHandler.update_cache_rp(his_inst.iloc[0, his_inst.columns.get_loc('C_TIME')]) return his_inst else: return his_clean def closest_minute(self, minute): target_minutes = [0, 15, 30, 45] return min(target_minutes, key=lambda x: abs(x - minute) if x - minute <= 0 else 99) @classmethod def update_cache_rp(cls, begin): cls.history_rp = cls.history_rp[cls.history_rp['C_TIME'] >= begin].reset_index(drop=True) @classmethod def update_cache_env(cls, begin): cls.env = cls.env[cls.env['C_TIME'] >= begin].reset_index(drop=True)