123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/3/31 11:14
- # file: requestHandler.py
- # author: David
- # company: shenyang JY
- import pandas as pd
- import numpy as np
- from sqlalchemy.dialects.postgresql.psycopg2 import logger
- from cache.inputData import dt_tag
- from cache.data_cleaning import cleaning, key_field_row_cleaning, rm_duplicated
- from validate import ValidationError
- class requestHandler(object):
- history_rp = None
- env = None
- def __init__(self, log, args):
- self.logger = log
- self.args = args
- self.opt = args.parse_args_and_yaml()
- def get_form_data(self, req):
- data = req.form.get("data")
- if data is None:
- raise ValidationError("data为空")
- # 这个eval()函数会验证字符串是否转换成字典 字段为Null,传回来的是什么格式?
- # {"字段": null} {"字段": } 都不会通过验证
- jata = eval(data.replace("\t", ""))
- nwp = pd.json_normalize(jata, record_path=["nwp"])
- dq = pd.json_normalize(jata, record_path=["dq"])
- history_dq = pd.json_normalize(jata, record_path=["history_dq"])
- history_rp1 = pd.json_normalize(jata, record_path=["history_rp"])
- env1 = pd.json_normalize(jata, record_path=["env"])
- nwp['C_TIME'] = pd.to_datetime(nwp['C_TIME'])
- nwp['DT_TAG'] = nwp.apply(lambda x: dt_tag(x['C_TIME']), axis=1)
- dq['C_TIME'] = pd.to_datetime(dq['C_TIME'])
- history_dq['C_TIME'] = pd.to_datetime(history_dq['C_TIME'])
- history_rp1['C_TIME'] = pd.to_datetime(history_rp1['C_TIME'])
- history_rp1 = rm_duplicated(history_rp1, self.logger)
- env1['C_TIME'] = pd.to_datetime(env1['C_TIME'])
- self.logger.info("-----历史数据长度,实际功率:{},环境监测仪:{}-----".format(len(history_rp1), len(env1)))
- self.logger.info("历史数据最后一个点,实际功率:{},环境监测仪:{}".format(history_rp1.iloc[-1, history_rp1.columns.get_loc('C_TIME')], env1.iloc[-1, env1.columns.get_loc('C_TIME')]))
- # 1. 新增
- if len(history_rp1) == 1 and requestHandler.history_rp is not None:
- if requestHandler.history_rp.iloc[-1, requestHandler.history_rp.columns.get_loc('C_TIME')] < history_rp1.iloc[0, history_rp1.columns.get_loc('C_TIME')]:
- self.logger.info("合并前,实际功率:{}".format(len(requestHandler.history_rp)))
- requestHandler.history_rp = pd.concat([requestHandler.history_rp, history_rp1]).reset_index(drop=True)
- if requestHandler.env.iloc[-1, requestHandler.env.columns.get_loc('C_TIME')] < env1.iloc[0, env1.columns.get_loc('C_TIME')]:
- self.logger.info("合并前,环境监测仪:{}".format(len(requestHandler.env)))
- requestHandler.env = pd.concat([requestHandler.env, env1]).reset_index(drop=True)
- self.logger.info("新增方法:实际功率:{},环境监测仪:{}".format(len(requestHandler.history_rp), len(requestHandler.env)))
- # 2. 重更 或者 初始
- else:
- requestHandler.history_rp = history_rp1
- requestHandler.env = env1
- self.logger.info("重更或初始方法:实际功率:{},测风塔:{}".format(len(requestHandler.history_rp), len(requestHandler.env)))
- # 3. 取测风塔均值、更新缓存
- env_columns = [ele for ele in self.opt.env_columns if ele not in ['C_TIME', 'C_FP_VALUE', 'C_REAL_VALUE', 'error']]
- env_columns = env_columns if len(env_columns) > 0 else [self.opt.usable_power['env']]
- env_ave = self.his_data_ave(requestHandler.env, env_columns)
- rp_inst = self.his_data_inst(requestHandler.history_rp, ['C_REAL_VALUE', 'C_ABLE_VALUE', 'LIMIT_STATUS'])
- return history_dq, rp_inst, env_ave, nwp, dq
- def his_data_ave(self, his, cols):
- # 历史数据最后一个点,往前推history_env_hours个小时的点数,不够返回errorCode=3,以测风塔数据取15分钟间隔的左边界为准
- # 1. 先进行数据清洗和特征筛选
- self.logger.info("气象站处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')],
- his.iloc[-1, his.columns.get_loc('C_TIME')]))
- if self.opt.Model["fusion"] is False:
- his = his.replace(-99, np.nan)
- his.set_index('C_TIME', inplace=True)
- his = his.fillna(method='ffill')
- his = his.fillna(method='bfill')
- his.reset_index(drop=False, inplace=True)
- his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger)
- if not his_clean.empty:
- self.logger.info("气象站清洗后时间范围:{}-{}".format(his_clean.iloc[0, his_clean.columns.get_loc('C_TIME')], his_clean.iloc[-1, his_clean.columns.get_loc('C_TIME')]))
- his_filter = his_clean[['C_TIME']+cols]
- # 2. 取时刻范围均值,都没有返回-99
- his_ave15 = his_filter.resample('15T', on='C_TIME', label='left').mean().reset_index()
- self.logger.info("气象站重采样时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')]))
- hours = self.opt.Model["his_points"]
- his_ave15 = his_ave15.tail(hours)
- self.logger.info("气象站处理后时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')]))
- self.logger.info("更新缓存前,cls.env第一个点时间:{}".format(requestHandler.env.iloc[0, requestHandler.env.columns.get_loc('C_TIME')]))
- requestHandler.update_cache_env(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')])
- self.logger.info("更新缓存后,cls.env第一个点时间:{}".format(requestHandler.env.iloc[0, requestHandler.env.columns.get_loc('C_TIME')]))
- self.logger.info("清洗后不为空,气象站:{}".format(len(his_ave15)))
- return his_ave15
- else:
- return his_clean
- def rp_data_ave(self, his, cols):
- self.logger.info("实际功率处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')],
- his.iloc[-1, his.columns.get_loc('C_TIME')]))
- his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger)
- if not his_clean.empty:
- his_ave15 = his_clean.resample('15T', on='C_TIME', label='left').mean().reset_index()
- his_ave15['LIMIT_STATUS'] = his_ave15['LIMIT_STATUS'].apply(lambda x: x if x in [0, 1] else 1)
- hours = self.opt.Model["his_points"]
- his_ave15 = his_ave15.tail(hours)
- self.logger.info("实际功率处理后时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')]))
- requestHandler.update_cache_rp(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')])
- return his_ave15
- else:
- return his_clean
- def his_data_inst(self, his, cols):
- self.logger.info("实际功率处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')], his.iloc[-1, his.columns.get_loc('C_TIME')]))
- # 先进行数据清洗和特征筛选
- his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger)
- if not his_clean.empty:
- # 创建一个新列来存储分钟数
- his_clean['minute'] = his_clean['C_TIME'].dt.minute
- # 应用这个函数来找到每行最接近的目标分钟数
- his_clean['closest_target_minute'] = his_clean['minute'].apply(self.closest_minute)
- his_clean['C_TIME'] = his_clean.apply(lambda x: x['C_TIME'].replace(minute=x['closest_target_minute']), axis=1)
- his_inst = his_clean.groupby('C_TIME').first().reset_index(drop=False)
- his_filter = his_inst[['C_TIME'] + cols]
- # self.logger.info(his_filter)
- his_filter.set_index('C_TIME', inplace=True)
- resample = pd.date_range(start=his_filter.index.min(), end=his_filter.index.max(), freq='15T', name='C_TIME')
- his_filter = his_filter.reindex(resample, fill_value=np.nan).reset_index(drop=False)
- # self.logger.info(his_filter)
- hours = self.opt.Model["his_points"]
- his_inst = his_filter.tail(hours)
- self.logger.info(
- "实际功率处理后时间范围:{}-{}".format(his_inst.iloc[0, his_inst.columns.get_loc('C_TIME')], his_inst.iloc[-1, his_inst.columns.get_loc('C_TIME')]))
- # self.logger.info(his_inst)
- requestHandler.update_cache_rp(his_inst.iloc[0, his_inst.columns.get_loc('C_TIME')])
- return his_inst
- else:
- return his_clean
- def closest_minute(self, minute):
- target_minutes = [0, 15, 30, 45]
- return min(target_minutes, key=lambda x: abs(x - minute) if x - minute <= 0 else 99)
- @classmethod
- def update_cache_rp(cls, begin):
- cls.history_rp = cls.history_rp[cls.history_rp['C_TIME'] >= begin].reset_index(drop=True)
- @classmethod
- def update_cache_env(cls, begin):
- cls.env = cls.env[cls.env['C_TIME'] >= begin].reset_index(drop=True)
|