inputData.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. import pandas as pd
  2. import datetime, time
  3. import os
  4. from sqlalchemy import create_engine
  5. import pytz, savedata
  6. from data_cleaning import cleaning, rm_duplicated
  7. current_path = os.path.dirname(__file__)
  8. dataloc = current_path + '/data/'
  9. weatherloc = [1]
  10. def readData(name):
  11. """
  12. 读取数据
  13. :param name: 名字
  14. :return:
  15. """
  16. path = dataloc + r"/" + name
  17. return pd.read_csv(path)
  18. def saveData(name, data):
  19. """
  20. 存放数据
  21. :param name: 名字
  22. :param data: 数据
  23. :return:
  24. """
  25. path = dataloc + r"/" + name
  26. os.makedirs(os.path.dirname(path), exist_ok=True)
  27. data.to_csv(path, index=False)
  28. def timestamp_to_datetime(ts):
  29. local_timezone = pytz.timezone('Asia/Shanghai')
  30. if type(ts) is not int:
  31. raise ValueError("timestamp-时间格式必须是整型")
  32. if len(str(ts)) == 13:
  33. dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
  34. return dt
  35. elif len(str(ts)) == 10:
  36. dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
  37. return dt
  38. else:
  39. raise ValueError("timestamp-时间格式错误")
  40. def timestr_to_timestamp(time_str):
  41. """
  42. 将时间戳或时间字符串转换为datetime.datetime类型
  43. :param time_data: int or str
  44. :return:datetime.datetime
  45. """
  46. if isinstance(time_str, str):
  47. if len(time_str) == 10:
  48. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
  49. return int(round(time.mktime(dt.timetuple())) * 1000)
  50. elif len(time_str) in {17, 18, 19}:
  51. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') # strptime字符串解析必须严格按照字符串中的格式
  52. return int(round(time.mktime(dt.timetuple())) * 1000) # 转换成毫秒级的时间戳
  53. else:
  54. raise ValueError("时间字符串长度不满足要求!")
  55. else:
  56. return time_str
  57. class DataBase(object):
  58. def __init__(self, begin, end, database):
  59. self.begin = begin
  60. self.end = end - pd.Timedelta(minutes=15)
  61. self.begin_stamp = timestr_to_timestamp(str(begin))
  62. self.end_stamp = timestr_to_timestamp(str(self.end))
  63. self.database = database
  64. def clear_data(self):
  65. """
  66. 删除所有csv
  67. :return:
  68. """
  69. # 设置文件夹路径
  70. import glob
  71. import os
  72. folder_path = dataloc
  73. # 使用 glob 获取所有的 .csv 文件路径
  74. csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
  75. # 遍历所有 .csv 文件并删除
  76. for file_path in csv_files:
  77. os.remove(file_path)
  78. # self.logger.info("清除所有csv文件")
  79. def create_database(self):
  80. """
  81. 创建数据库连接
  82. :param database: 数据库地址
  83. :return:
  84. """
  85. engine = create_engine(self.database)
  86. return engine
  87. def exec_sql(self, sql, engine):
  88. """
  89. 从数据库获取数据
  90. :param sql: sql语句
  91. :param engine: 数据库对象
  92. :return:
  93. """
  94. df = pd.read_sql_query(sql, engine)
  95. return df
  96. def split_time(self, data):
  97. data['C_TIME'] = pd.to_datetime(data["C_TIME"])
  98. data.set_index('C_TIME', inplace=True)
  99. data = data.sort_index().loc[self.begin: self.end]
  100. data.reset_index(drop=False, inplace=True)
  101. return data
  102. def get_process_NWP(self):
  103. """
  104. 从数据库中获取NWP数据,并进行简单处理
  105. :param database:
  106. :return:
  107. """
  108. # NPW数据
  109. engine = self.create_database()
  110. sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
  111. "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, " \
  112. "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
  113. "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp " \
  114. " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 光的NWP字段
  115. NWP = self.exec_sql(sql_NWP, engine)
  116. NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
  117. NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
  118. NWP = cleaning(NWP, 'NWP')
  119. # NWP = self.split_time(NWP)
  120. NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  121. savedata.saveData("NWP.csv", NWP)
  122. print("导出nwp数据")
  123. return NWP
  124. def get_process_weather(self):
  125. """
  126. 获取环境检测仪数据
  127. :param database:
  128. :return:
  129. """
  130. engine = self.create_database()
  131. print("现有环境监测仪:{}".format(weatherloc))
  132. for i in weatherloc:
  133. # 删除没用的列
  134. drop_colmns = ["C_ID", "C_EQUIPMENT_NO", "C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10", "C_STATUS", "C_IS_GENERATED","C_ABNORMAL_CODE"]
  135. get_colmns = []
  136. # 查询表的所有列名
  137. result_set = self.exec_sql("SHOW COLUMNS FROM t_weather_station_status_data", engine)
  138. for name in result_set.iloc[:,0]:
  139. if name not in drop_colmns:
  140. get_colmns.append(name)
  141. all_columns_str = ", ".join([f'{col}' for col in get_colmns])
  142. weather_sql = "select " + all_columns_str + " from t_weather_station_status_data where C_EQUIPMENT_NO="+ str(i) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end)
  143. weather = self.exec_sql(weather_sql, engine)
  144. weather['C_TIME'] = pd.to_datetime(weather['C_TIME'])
  145. # weather = self.split_time(weather)
  146. savedata.saveData("/weather-{}.csv".format(i), weather)
  147. print("环境监测仪{}导出数据".format(i))
  148. def get_process_power(self):
  149. """
  150. 获取整体功率数据
  151. :param database:
  152. :return:
  153. """
  154. engine = self.create_database()
  155. sql_cap = "select C_CAPACITY from t_electric_field"
  156. cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
  157. sql_power = "select C_TIME, C_REAL_VALUE, C_ABLE_VALUE, C_IS_RATIONING_BY_MANUAL_CONTROL, C_IS_RATIONING_BY_AUTO_CONTROL" \
  158. " from t_power_station_status_data where C_TIME between '{}' and '{}'".format(self.begin, self.end)
  159. # if self.opt.usable_power["clean_power_by_signal"]:
  160. # sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and C_IS_RATIONING_BY_AUTO_CONTROL=0"
  161. powers = self.exec_sql(sql_power, engine)
  162. mask1 = powers.loc[:, 'C_REAL_VALUE'].astype(float) > float(cap)
  163. mask = powers['C_REAL_VALUE'] == -99
  164. mask = mask | mask1
  165. print("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
  166. powers = powers[~mask]
  167. print("剔除完后还剩{}条".format(len(powers)))
  168. powers.reset_index(drop=True, inplace=True)
  169. binary_map = {b'\x00': 0, b'\x01': 1}
  170. powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
  171. powers = rm_duplicated(powers)
  172. savedata.saveData("power.csv", powers)
  173. def get_process_dq(self):
  174. """
  175. 获取短期预测结果
  176. :param database:
  177. :return:
  178. """
  179. engine = self.create_database()
  180. sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
  181. "where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
  182. dq = self.exec_sql(sql_dq, engine)
  183. # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
  184. dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
  185. # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
  186. # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
  187. dq = cleaning(dq, 'dq', cols=['C_FP_VALUE'])
  188. dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  189. savedata.saveData("dq.csv", dq)
  190. print("导出dq数据")
  191. def get_process_cdq(self):
  192. """
  193. 获取超短期预测结果
  194. :param database:
  195. :return:
  196. """
  197. engine = self.create_database()
  198. sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from " \
  199. "t_forecast_power_ultra_short_term_his" \
  200. " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
  201. cdq = self.exec_sql(sql_cdq, engine)
  202. cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
  203. cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False)
  204. # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
  205. cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  206. savedata.saveData("cdq.csv", cdq)
  207. def indep_process(self):
  208. """
  209. 进一步数据处理:时间统一处理等
  210. :return:
  211. """
  212. # 环境监测仪数据处理
  213. for i in weatherloc:
  214. weather = savedata.readData("/weather-{}.csv".format(i))
  215. weather = cleaning(weather, 'weather', cols=['C_GLOBALR', 'C_DIRECTR', 'C_DIFFUSER', 'C_RH', 'C_AIRT', 'C_P', 'C_WS', 'C_WD'])
  216. savedata.saveData("/weather-{}-process.csv".format(i), weather)
  217. def data_process(self):
  218. """
  219. 数据导出+初步处理的总操控代码
  220. :param database:
  221. :return:
  222. """
  223. # self.clear_data()
  224. # try:
  225. self.get_process_power()
  226. self.get_process_dq()
  227. # self.get_process_cdq()
  228. self.get_process_NWP()
  229. self.get_process_weather()
  230. self.indep_process()
  231. # except Exception as e:
  232. # print("导出数据出错:{}".format(e.args))