request.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/31 11:14
  4. # file: requestHandler.py
  5. # author: David
  6. # company: shenyang JY
  7. import pandas as pd
  8. import numpy as np
  9. from sqlalchemy.dialects.postgresql.psycopg2 import logger
  10. from cache.inputData import dt_tag
  11. from cache.data_cleaning import cleaning, key_field_row_cleaning, rm_duplicated
  12. from validate import ValidationError
  13. class requestHandler(object):
  14. history_rp = None
  15. env = None
  16. def __init__(self, log, args):
  17. self.logger = log
  18. self.args = args
  19. self.opt = args.parse_args_and_yaml()
  20. def get_form_data(self, req):
  21. data = req.form.get("data")
  22. if data is None:
  23. raise ValidationError("data为空")
  24. # 这个eval()函数会验证字符串是否转换成字典 字段为Null,传回来的是什么格式?
  25. # {"字段": null} {"字段": } 都不会通过验证
  26. jata = eval(data.replace("\t", ""))
  27. nwp = pd.json_normalize(jata, record_path=["nwp"])
  28. dq = pd.json_normalize(jata, record_path=["dq"])
  29. history_dq = pd.json_normalize(jata, record_path=["history_dq"])
  30. history_rp1 = pd.json_normalize(jata, record_path=["history_rp"])
  31. env1 = pd.json_normalize(jata, record_path=["env"])
  32. nwp['C_TIME'] = pd.to_datetime(nwp['C_TIME'])
  33. nwp['DT_TAG'] = nwp.apply(lambda x: dt_tag(x['C_TIME']), axis=1)
  34. dq['C_TIME'] = pd.to_datetime(dq['C_TIME'])
  35. history_dq['C_TIME'] = pd.to_datetime(history_dq['C_TIME'])
  36. history_rp1['C_TIME'] = pd.to_datetime(history_rp1['C_TIME'])
  37. history_rp1 = rm_duplicated(history_rp1, self.logger)
  38. env1['C_TIME'] = pd.to_datetime(env1['C_TIME'])
  39. self.logger.info("-----历史数据长度,实际功率:{},环境监测仪:{}-----".format(len(history_rp1), len(env1)))
  40. self.logger.info("历史数据最后一个点,实际功率:{},环境监测仪:{}".format(history_rp1.iloc[-1, history_rp1.columns.get_loc('C_TIME')], env1.iloc[-1, env1.columns.get_loc('C_TIME')]))
  41. # 1. 新增
  42. if len(history_rp1) == 1 and requestHandler.history_rp is not None:
  43. if requestHandler.history_rp.iloc[-1, requestHandler.history_rp.columns.get_loc('C_TIME')] < history_rp1.iloc[0, history_rp1.columns.get_loc('C_TIME')]:
  44. self.logger.info("合并前,实际功率:{}".format(len(requestHandler.history_rp)))
  45. requestHandler.history_rp = pd.concat([requestHandler.history_rp, history_rp1]).reset_index(drop=True)
  46. if requestHandler.env.iloc[-1, requestHandler.env.columns.get_loc('C_TIME')] < env1.iloc[0, env1.columns.get_loc('C_TIME')]:
  47. self.logger.info("合并前,环境监测仪:{}".format(len(requestHandler.env)))
  48. requestHandler.env = pd.concat([requestHandler.env, env1]).reset_index(drop=True)
  49. self.logger.info("新增方法:实际功率:{},环境监测仪:{}".format(len(requestHandler.history_rp), len(requestHandler.env)))
  50. # 2. 重更 或者 初始
  51. else:
  52. requestHandler.history_rp = history_rp1
  53. requestHandler.env = env1
  54. self.logger.info("重更或初始方法:实际功率:{},测风塔:{}".format(len(requestHandler.history_rp), len(requestHandler.env)))
  55. # 3. 取测风塔均值、更新缓存
  56. env_columns = [ele for ele in self.opt.env_columns if ele not in ['C_TIME', 'C_FP_VALUE', 'C_REAL_VALUE', 'error']]
  57. env_columns = env_columns if len(env_columns) > 0 else [self.opt.usable_power['env']]
  58. env_ave = self.his_data_ave(requestHandler.env, env_columns)
  59. rp_inst = self.his_data_inst(requestHandler.history_rp, ['C_REAL_VALUE', 'C_ABLE_VALUE', 'LIMIT_STATUS'])
  60. return history_dq, rp_inst, env_ave, nwp, dq
  61. def his_data_ave(self, his, cols):
  62. # 历史数据最后一个点,往前推history_env_hours个小时的点数,不够返回errorCode=3,以测风塔数据取15分钟间隔的左边界为准
  63. # 1. 先进行数据清洗和特征筛选
  64. self.logger.info("气象站处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')],
  65. his.iloc[-1, his.columns.get_loc('C_TIME')]))
  66. if self.opt.Model["fusion"] is False:
  67. his = his.replace(-99, np.nan)
  68. his.set_index('C_TIME', inplace=True)
  69. his = his.fillna(method='ffill')
  70. his = his.fillna(method='bfill')
  71. his.reset_index(drop=False, inplace=True)
  72. his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger)
  73. if not his_clean.empty:
  74. self.logger.info("气象站清洗后时间范围:{}-{}".format(his_clean.iloc[0, his_clean.columns.get_loc('C_TIME')], his_clean.iloc[-1, his_clean.columns.get_loc('C_TIME')]))
  75. his_filter = his_clean[['C_TIME']+cols]
  76. # 2. 取时刻范围均值,都没有返回-99
  77. his_ave15 = his_filter.resample('15T', on='C_TIME', label='left').mean().reset_index()
  78. self.logger.info("气象站重采样时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')]))
  79. hours = self.opt.Model["his_points"]
  80. his_ave15 = his_ave15.tail(hours)
  81. self.logger.info("气象站处理后时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')]))
  82. self.logger.info("更新缓存前,cls.env第一个点时间:{}".format(requestHandler.env.iloc[0, requestHandler.env.columns.get_loc('C_TIME')]))
  83. requestHandler.update_cache_env(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')])
  84. self.logger.info("更新缓存后,cls.env第一个点时间:{}".format(requestHandler.env.iloc[0, requestHandler.env.columns.get_loc('C_TIME')]))
  85. self.logger.info("清洗后不为空,气象站:{}".format(len(his_ave15)))
  86. return his_ave15
  87. else:
  88. return his_clean
  89. def rp_data_ave(self, his, cols):
  90. self.logger.info("实际功率处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')],
  91. his.iloc[-1, his.columns.get_loc('C_TIME')]))
  92. his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger)
  93. if not his_clean.empty:
  94. his_ave15 = his_clean.resample('15T', on='C_TIME', label='left').mean().reset_index()
  95. his_ave15['LIMIT_STATUS'] = his_ave15['LIMIT_STATUS'].apply(lambda x: x if x in [0, 1] else 1)
  96. hours = self.opt.Model["his_points"]
  97. his_ave15 = his_ave15.tail(hours)
  98. self.logger.info("实际功率处理后时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')]))
  99. requestHandler.update_cache_rp(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')])
  100. return his_ave15
  101. else:
  102. return his_clean
  103. def his_data_inst(self, his, cols):
  104. self.logger.info("实际功率处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')], his.iloc[-1, his.columns.get_loc('C_TIME')]))
  105. # 先进行数据清洗和特征筛选
  106. his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger)
  107. if not his_clean.empty:
  108. # 创建一个新列来存储分钟数
  109. his_clean['minute'] = his_clean['C_TIME'].dt.minute
  110. # 应用这个函数来找到每行最接近的目标分钟数
  111. his_clean['closest_target_minute'] = his_clean['minute'].apply(self.closest_minute)
  112. his_clean['C_TIME'] = his_clean.apply(lambda x: x['C_TIME'].replace(minute=x['closest_target_minute']), axis=1)
  113. his_inst = his_clean.groupby('C_TIME').first().reset_index(drop=False)
  114. his_filter = his_inst[['C_TIME'] + cols]
  115. # self.logger.info(his_filter)
  116. his_filter.set_index('C_TIME', inplace=True)
  117. resample = pd.date_range(start=his_filter.index.min(), end=his_filter.index.max(), freq='15T', name='C_TIME')
  118. his_filter = his_filter.reindex(resample, fill_value=np.nan).reset_index(drop=False)
  119. # self.logger.info(his_filter)
  120. hours = self.opt.Model["his_points"]
  121. his_inst = his_filter.tail(hours)
  122. self.logger.info(
  123. "实际功率处理后时间范围:{}-{}".format(his_inst.iloc[0, his_inst.columns.get_loc('C_TIME')], his_inst.iloc[-1, his_inst.columns.get_loc('C_TIME')]))
  124. # self.logger.info(his_inst)
  125. requestHandler.update_cache_rp(his_inst.iloc[0, his_inst.columns.get_loc('C_TIME')])
  126. return his_inst
  127. else:
  128. return his_clean
  129. def closest_minute(self, minute):
  130. target_minutes = [0, 15, 30, 45]
  131. return min(target_minutes, key=lambda x: abs(x - minute) if x - minute <= 0 else 99)
  132. @classmethod
  133. def update_cache_rp(cls, begin):
  134. cls.history_rp = cls.history_rp[cls.history_rp['C_TIME'] >= begin].reset_index(drop=True)
  135. @classmethod
  136. def update_cache_env(cls, begin):
  137. cls.env = cls.env[cls.env['C_TIME'] >= begin].reset_index(drop=True)