inputData.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. import pandas as pd
  2. import datetime, time
  3. import yaml
  4. import pymysql
  5. import os
  6. from sqlalchemy import create_engine
  7. import pytz
  8. from cache.data_cleaning import cleaning, rm_duplicated
  9. current_path = os.path.dirname(__file__)
  10. dataloc = current_path + '/data/'
  11. def readData(name):
  12. """
  13. 读取数据
  14. :param name: 名字
  15. :return:
  16. """
  17. path = dataloc + r"/" + name
  18. return pd.read_csv(path)
  19. def saveData(name, data):
  20. """
  21. 存放数据
  22. :param name: 名字
  23. :param data: 数据
  24. :return:
  25. """
  26. path = dataloc + r"/" + name
  27. os.makedirs(os.path.dirname(path), exist_ok=True)
  28. data.to_csv(path, index=False)
  29. def timestamp_to_datetime(ts):
  30. local_timezone = pytz.timezone('Asia/Shanghai')
  31. if type(ts) is not int:
  32. raise ValueError("timestamp-时间格式必须是整型")
  33. if len(str(ts)) == 13:
  34. dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
  35. return dt
  36. elif len(str(ts)) == 10:
  37. dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
  38. return dt
  39. else:
  40. raise ValueError("timestamp-时间格式错误")
  41. def dt_tag(dt):
  42. date = dt.replace(hour=0, minute=0, second=0)
  43. delta = (dt - date) / pd.Timedelta(minutes=15)
  44. return delta + 1
  45. def timestr_to_timestamp(time_str):
  46. """
  47. 将时间戳或时间字符串转换为datetime.datetime类型
  48. :param time_data: int or str
  49. :return:datetime.datetime
  50. """
  51. if isinstance(time_str, str):
  52. if len(time_str) == 10:
  53. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
  54. return int(round(time.mktime(dt.timetuple())) * 1000)
  55. elif len(time_str) in {17, 18, 19}:
  56. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') # strptime字符串解析必须严格按照字符串中的格式
  57. return int(round(time.mktime(dt.timetuple())) * 1000) # 转换成毫秒级的时间戳
  58. else:
  59. raise ValueError("时间字符串长度不满足要求!")
  60. else:
  61. return time_str
  62. class DataBase(object):
  63. def __init__(self, begin, end, opt, logger):
  64. self.opt = opt
  65. self.begin = begin
  66. self.his_begin = self.begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)
  67. self.end = end + pd.Timedelta(days=1) - pd.Timedelta(minutes=15)
  68. self.begin_stamp = timestr_to_timestamp(str(begin))
  69. self.his_begin_stamp = timestr_to_timestamp(str(self.his_begin))
  70. self.end_stamp = timestr_to_timestamp(str(self.end))
  71. self.database = opt.database
  72. self.logger = logger
  73. def clear_data(self):
  74. """
  75. 删除所有csv
  76. :return:
  77. """
  78. # 设置文件夹路径
  79. import glob
  80. import os
  81. folder_path = dataloc
  82. # 使用 glob 获取所有的 .csv 文件路径
  83. csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
  84. # 遍历所有 .csv 文件并删除
  85. for file_path in csv_files:
  86. os.remove(file_path)
  87. self.logger.info("清除所有csv文件")
  88. def create_database(self):
  89. """
  90. 创建数据库连接
  91. :param database: 数据库地址
  92. :return:
  93. """
  94. engine = create_engine(self.database)
  95. return engine
  96. def exec_sql(self, sql, engine):
  97. """
  98. 从数据库获取数据
  99. :param sql: sql语句
  100. :param engine: 数据库对象
  101. :return:
  102. """
  103. df = pd.read_sql_query(sql, engine)
  104. return df
  105. def split_time(self, data):
  106. data['C_TIME'] = pd.to_datetime(data["C_TIME"])
  107. data.set_index('C_TIME', inplace=True)
  108. data = data.sort_index().loc[self.begin: self.end]
  109. data.reset_index(drop=False, inplace=True)
  110. return data
  111. def get_process_NWP(self):
  112. """
  113. 从数据库中获取NWP数据,并进行简单处理
  114. :param database:
  115. :return:
  116. """
  117. # NPW数据
  118. engine = self.create_database()
  119. if self.opt.new_field:
  120. sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR, C_TPR," \
  121. "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, C_SOLAR_ZENITH," \
  122. "C_LCC, C_MCC, C_HCC, C_TCC, C_CLEARSKY_GHI, C_DNI_CALCD," \
  123. "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
  124. "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp " \
  125. " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 新NWP字段
  126. else:
  127. sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
  128. "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, " \
  129. "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
  130. "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp " \
  131. " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 老NWP字段
  132. NWP = self.exec_sql(sql_NWP, engine)
  133. NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
  134. NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
  135. NWP['DT_TAG'] = NWP.apply(lambda x: dt_tag(x['C_TIME']), axis=1)
  136. NWP = cleaning(NWP, 'NWP', self.logger)
  137. # NWP = self.split_time(NWP)
  138. NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  139. saveData("NWP.csv", NWP)
  140. self.logger.info("导出nwp数据")
  141. return NWP
  142. def get_process_weather(self):
  143. """
  144. 获取环境检测仪数据
  145. :param database:
  146. :return:
  147. """
  148. engine = self.create_database()
  149. self.logger.info("现有环境监测仪:{}".format(self.opt.weatherloc))
  150. for i in self.opt.weatherloc:
  151. # 删除没用的列
  152. drop_colmns = ["C_ID", "C_EQUIPMENT_NO", "C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10", "C_STATUS", "C_IS_GENERATED","C_ABNORMAL_CODE"]
  153. get_colmns = []
  154. # 查询表的所有列名
  155. result_set = self.exec_sql("SHOW COLUMNS FROM t_weather_station_status_data", engine)
  156. for name in result_set.iloc[:,0]:
  157. if name not in drop_colmns:
  158. get_colmns.append(name)
  159. all_columns_str = ", ".join([f'{col}' for col in get_colmns])
  160. weather_sql = "select " + all_columns_str + " from t_weather_station_status_data where C_EQUIPMENT_NO="+ str(i) + " and C_TIME between '{}' and '{}'".format(self.his_begin, self.end)
  161. weather = self.exec_sql(weather_sql, engine)
  162. weather['C_TIME'] = pd.to_datetime(weather['C_TIME'])
  163. # weather = self.split_time(weather)
  164. saveData("/weather-{}.csv".format(i), weather)
  165. self.logger.info("环境监测仪{}导出数据".format(i))
  166. def get_process_power(self):
  167. """
  168. 获取整体功率数据
  169. :param database:
  170. :return:
  171. """
  172. engine = self.create_database()
  173. sql_cap = "select C_CAPACITY from t_electric_field"
  174. cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
  175. self.opt.cap = float(cap)
  176. sql_power = "select C_TIME,C_REAL_VALUE, C_ABLE_VALUE, C_REFERENCE_POWER_BY_SAMPLE, C_IS_RATIONING_BY_MANUAL_CONTROL," \
  177. " C_IS_RATIONING_BY_AUTO_CONTROL from t_power_station_status_data" \
  178. " where C_TIME between '{}' and '{}'".format(self.his_begin, self.end)
  179. powers = self.exec_sql(sql_power, engine)
  180. mask2 = powers[self.opt.predict] < 0
  181. mask1 = powers.loc[:, 'C_REAL_VALUE'].astype(float) > float(cap)
  182. mask = powers['C_REAL_VALUE'] == -99
  183. mask = mask | mask1 | mask2
  184. self.logger.info("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
  185. powers = powers[~mask]
  186. self.logger.info("剔除完后还剩{}条".format(len(powers)))
  187. powers.reset_index(drop=True, inplace=True)
  188. binary_map = {b'\x00': 0, b'\x01': 1}
  189. powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
  190. powers = rm_duplicated(powers, self.logger)
  191. saveData("power.csv", powers)
  192. def get_process_dq(self):
  193. """
  194. 获取短期预测结果
  195. :param database:
  196. :return:
  197. """
  198. engine = self.create_database()
  199. sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
  200. "where C_FORECAST_TIME between {} and {}".format(self.his_begin_stamp, self.end_stamp)
  201. dq = self.exec_sql(sql_dq, engine)
  202. # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
  203. dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
  204. # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
  205. # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
  206. dq = cleaning(dq, 'dq', self.logger, cols=['C_FP_VALUE'])
  207. dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  208. saveData("dq.csv", dq)
  209. self.logger.info("导出dq数据")
  210. def get_process_cdq(self):
  211. """
  212. 获取超短期预测结果
  213. :param database:
  214. :return:
  215. """
  216. engine = self.create_database()
  217. sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from " \
  218. "t_forecast_power_ultra_short_term_his" \
  219. " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
  220. cdq = self.exec_sql(sql_cdq, engine)
  221. cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
  222. cdq = cleaning(cdq, 'cdq', self.logger, cols=['C_ABLE_VALUE'], dup=False)
  223. # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
  224. cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  225. saveData("cdq.csv", cdq)
  226. def indep_process(self):
  227. """
  228. 进一步数据处理:时间统一处理等
  229. :return:
  230. """
  231. # 环境监测仪数据处理
  232. for i in self.opt.weatherloc:
  233. weather = readData("/weather-{}.csv".format(i))
  234. env_columns = [ele for ele in self.opt.env_columns if ele not in ['C_TIME', 'C_FP_VALUE', 'C_REAL_VALUE', 'error']]
  235. weather = cleaning(weather, 'weather', self.logger, cols=env_columns)
  236. weather = weather[weather[self.opt.usable_power["env"]] >= 0]
  237. weather['C_TIME'] = pd.to_datetime(weather['C_TIME'])
  238. weather_ave = weather.resample('15T', on='C_TIME').mean().reset_index()
  239. weather_ave = weather_ave.dropna(subset=[self.opt.usable_power['env']])
  240. weather_ave.set_index('C_TIME', inplace=True)
  241. weather_ave = weather_ave.interpolate(method='linear')
  242. weather_ave = weather_ave.fillna(method='ffill')
  243. weather_ave = weather_ave.fillna(method='bfill')
  244. weather_ave.reset_index(drop=False, inplace=True)
  245. weather_ave.iloc[:, 1:] = weather_ave.iloc[:, 1:].round(2)
  246. saveData("/weather-{}-process.csv".format(i), weather_ave)
  247. def data_process(self):
  248. """
  249. 数据导出+初步处理的总操控代码
  250. :param database:
  251. :return:
  252. """
  253. self.clear_data()
  254. try:
  255. self.get_process_power()
  256. self.get_process_dq()
  257. self.get_process_cdq()
  258. self.get_process_NWP()
  259. self.get_process_weather()
  260. self.indep_process()
  261. except Exception as e:
  262. self.logger.critical("导出数据出错:{}".format(e.args))