request.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/31 11:14
  4. # file: requestHandler.py
  5. # author: David
  6. # company: shenyang JY
  7. import pandas as pd
  8. import numpy as np
  9. from cache.data_cleaning import cleaning, key_field_row_cleaning, rm_duplicated
  10. from validate import ValidationError
  11. class requestHandler(object):
  12. history_rp = None
  13. ws = None
  14. def __init__(self, log, args):
  15. self.logger = log
  16. self.args = args
  17. self.opt = args.parse_args_and_yaml()
  18. # def get_form_data1(self, req):
  19. # data = req.form.get("data")
  20. # if data is None:
  21. # raise ValidationError("data为空")
  22. # # 这个eval()函数会验证字符串是否转换成字典 字段为Null,传回来的是什么格式?
  23. # # {"字段": null} {"字段": } 都不会通过验证
  24. # jata = eval(data.replace("\t", ""))
  25. # nwp = pd.json_normalize(jata, record_path=["nwp"])
  26. # dq = pd.json_normalize(jata, record_path=["dq"])
  27. # history_dq = pd.json_normalize(jata, record_path=["history_dq"])
  28. # history_rp = pd.json_normalize(jata, record_path=["history_rp"])
  29. # ws = pd.json_normalize(jata, record_path=["env"])
  30. # nwp['C_TIME'] = pd.to_datetime(nwp['C_TIME'])
  31. # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'])
  32. # history_dq['C_TIME'] = pd.to_datetime(history_dq['C_TIME'])
  33. # history_rp['C_TIME'] = pd.to_datetime(history_rp['C_TIME'])
  34. # ws['C_TIME'] = pd.to_datetime(ws['C_TIME'])
  35. # return history_dq, history_rp, ws, nwp, dq
  36. def get_form_data(self, req):
  37. data = req.form.get("data")
  38. if data is None:
  39. raise ValidationError("data为空")
  40. # 这个eval()函数会验证字符串是否转换成字典 字段为Null,传回来的是什么格式?
  41. # {"字段": null} {"字段": } 都不会通过验证
  42. jata = eval(data.replace("\t", ""))
  43. nwp = pd.json_normalize(jata, record_path=["nwp"])
  44. dq = pd.json_normalize(jata, record_path=["dq"])
  45. history_dq = pd.json_normalize(jata, record_path=["history_dq"])
  46. history_rp1 = pd.json_normalize(jata, record_path=["history_rp"])
  47. ws1 = pd.json_normalize(jata, record_path=["env"])
  48. nwp['C_TIME'] = pd.to_datetime(nwp['C_TIME'])
  49. dq['C_TIME'] = pd.to_datetime(dq['C_TIME'])
  50. history_dq['C_TIME'] = pd.to_datetime(history_dq['C_TIME'])
  51. history_rp1['C_TIME'] = pd.to_datetime(history_rp1['C_TIME'])
  52. history_rp1 = rm_duplicated(history_rp1, self.logger)
  53. ws1['C_TIME'] = pd.to_datetime(ws1['C_TIME'])
  54. self.logger.info("-----历史数据长度,实际功率:{},测风塔:{}-----".format(len(history_rp1), len(ws1)))
  55. self.logger.info("历史数据最后一个点,实际功率:{},测风塔:{}".format(history_rp1.iloc[-1, history_rp1.columns.get_loc('C_TIME')], ws1.iloc[-1, ws1.columns.get_loc('C_TIME')]))
  56. # 1. 新增
  57. if len(history_rp1) == 1 and requestHandler.history_rp is not None:
  58. if requestHandler.history_rp.iloc[-1, requestHandler.history_rp.columns.get_loc('C_TIME')] < history_rp1.iloc[0, history_rp1.columns.get_loc('C_TIME')]:
  59. self.logger.info("合并前,实际功率:{}".format(len(requestHandler.history_rp)))
  60. requestHandler.history_rp = pd.concat([requestHandler.history_rp, history_rp1]).reset_index(drop=True)
  61. if requestHandler.ws.iloc[-1, requestHandler.ws.columns.get_loc('C_TIME')] < ws1.iloc[0, ws1.columns.get_loc('C_TIME')]:
  62. self.logger.info("合并前,测风塔:{}".format(len(requestHandler.ws)))
  63. requestHandler.ws = pd.concat([requestHandler.ws, ws1]).reset_index(drop=True)
  64. self.logger.info("新增方法:实际功率:{},测风塔:{}".format(len(requestHandler.history_rp), len(requestHandler.ws)))
  65. # 2. 重更 或者 初始
  66. else:
  67. requestHandler.history_rp = history_rp1
  68. requestHandler.ws = ws1
  69. self.logger.info("重更或初始方法:实际功率:{},测风塔:{}".format(len(requestHandler.history_rp), len(requestHandler.ws)))
  70. # 3. 取测风塔均值、更新缓存
  71. env_columns = [ele for ele in self.opt.env_columns if ele not in ['C_TIME', 'C_FP_VALUE', 'C_REAL_VALUE', 'error']]
  72. env_columns = env_columns if len(env_columns) > 0 else [self.opt.usable_power['env']]
  73. ws_ave = self.his_data_ave(requestHandler.ws, env_columns)
  74. rp_inst = self.his_data_inst(requestHandler.history_rp, ['C_REAL_VALUE', 'C_ABLE_VALUE', 'LIMIT_STATUS'])
  75. return history_dq, rp_inst, ws_ave, nwp, dq
  76. def his_data_ave(self, his, cols):
  77. # 历史数据最后一个点,往前推history_env_hours个小时的点数,不够返回errorCode=3,以测风塔数据取15分钟间隔的左边界为准
  78. # 1. 先进行数据清洗和特征筛选
  79. self.logger.info("测风塔处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')], his.iloc[-1, his.columns.get_loc('C_TIME')]))
  80. if self.opt.Model["fusion"] is False:
  81. his = his.replace(-99, np.nan)
  82. his.set_index('C_TIME', inplace=True)
  83. his = his.interpolate(method='linear')
  84. his = his.fillna(method='ffill')
  85. his = his.fillna(method='bfill')
  86. his.reset_index(drop=False, inplace=True)
  87. his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger)
  88. if not his_clean.empty:
  89. self.logger.info("测风塔清洗后时间范围:{}-{}".format(his_clean.iloc[0, his_clean.columns.get_loc('C_TIME')], his_clean.iloc[-1, his_clean.columns.get_loc('C_TIME')]))
  90. his_filter = his_clean[['C_TIME']+cols]
  91. # 2. 取时刻范围均值,都没有返回-99
  92. his_ave15 = his_filter.resample('15T', on='C_TIME', label='left').mean().reset_index()
  93. self.logger.info("测风塔重采样时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')]))
  94. hours = self.opt.Model["his_points"]
  95. his_ave15 = his_ave15.tail(hours)
  96. self.logger.info("测风塔处理后时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')]))
  97. self.logger.info("更新缓存前,cls.ws第一个点时间:{}".format(requestHandler.ws.iloc[0, requestHandler.ws.columns.get_loc('C_TIME')]))
  98. requestHandler.update_cache_ws(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')])
  99. self.logger.info("更新缓存后,cls.ws第一个点时间:{}".format(requestHandler.ws.iloc[0, requestHandler.ws.columns.get_loc('C_TIME')]))
  100. self.logger.info("清洗后不为空,测风塔:{}".format(len(his_ave15)))
  101. return his_ave15
  102. else:
  103. return his_clean
  104. def rp_data_ave(self, his, cols):
  105. self.logger.info("实际功率处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')], his.iloc[-1, his.columns.get_loc('C_TIME')]))
  106. his_clean = key_field_row_cleaning(his, cols=cols, logger=self.logger)
  107. if not his_clean.empty:
  108. his_ave15 = his_clean.resample('15T', on='C_TIME', label='left').mean().reset_index()
  109. his_ave15['LIMIT_STATUS'] = his_ave15['LIMIT_STATUS'].apply(lambda x: x if x in [0, 1] else 1)
  110. hours = self.opt.Model["his_points"]
  111. his_ave15 = his_ave15.tail(hours)
  112. self.logger.info("实际功率处理后时间范围:{}-{}".format(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')], his_ave15.iloc[-1, his_ave15.columns.get_loc('C_TIME')]))
  113. requestHandler.update_cache_rp(his_ave15.iloc[0, his_ave15.columns.get_loc('C_TIME')])
  114. return his_ave15
  115. else:
  116. return his_clean
  117. def his_data_inst(self, his, cols):
  118. self.logger.info("实际功率处理前时间范围:{}-{}".format(his.iloc[0, his.columns.get_loc('C_TIME')], his.iloc[-1, his.columns.get_loc('C_TIME')]))
  119. # 先进行数据清洗和特征筛选
  120. his_clean = key_field_row_cleaning(his, cols, logger=self.logger)
  121. if not his_clean.empty:
  122. # 创建一个新列来存储分钟数
  123. his_clean['minute'] = his_clean['C_TIME'].dt.minute
  124. # 应用这个函数来找到每行最接近的目标分钟数
  125. his_clean['closest_target_minute'] = his_clean['minute'].apply(self.closest_minute)
  126. his_clean['C_TIME'] = his_clean.apply(lambda x: x['C_TIME'].replace(minute=x['closest_target_minute']),
  127. axis=1)
  128. his_inst = his_clean.groupby('C_TIME').first().reset_index(drop=False)
  129. his_filter = his_inst[['C_TIME'] + cols]
  130. his_filter.set_index('C_TIME', inplace=True)
  131. resample = pd.date_range(start=his_filter.index.min(), end=his_filter.index.max(), freq='15T', name='C_TIME')
  132. his_filter = his_filter.reindex(resample, fill_value=np.nan).reset_index(drop=False)
  133. hours = self.opt.Model["his_points"]
  134. his_inst = his_filter.tail(hours)
  135. self.logger.info("实际功率处理后时间范围:{}-{}".format(his_inst.iloc[0, his_inst.columns.get_loc('C_TIME')], his_inst.iloc[-1, his_inst.columns.get_loc('C_TIME')]))
  136. requestHandler.update_cache_rp(his_inst.iloc[0, his_inst.columns.get_loc('C_TIME')])
  137. return his_inst
  138. else:
  139. return his_clean
  140. def closest_minute(self, minute):
  141. target_minutes = [0, 15, 30, 45]
  142. return min(target_minutes, key=lambda x: abs(x - minute) if x - minute <= 0 else 99)
  143. @classmethod
  144. def update_cache_rp(cls, begin):
  145. cls.history_rp = cls.history_rp[cls.history_rp['C_TIME'] >= begin].reset_index(drop=True)
  146. @classmethod
  147. def update_cache_ws(cls, begin):
  148. cls.ws = cls.ws[cls.ws['C_TIME'] >= begin].reset_index(drop=True)