inputData.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. import pandas as pd
  2. import datetime, time
  3. import pytz
  4. from savedata import saveData, readData
  5. from Arg import Arg
  6. from sqlalchemy import create_engine
  7. import pytz
  8. from data_cleaning import cleaning, rm_duplicated
  9. # def readData(name):
  10. # """
  11. # 读取数据
  12. # :param name: 名字
  13. # :return:
  14. # """
  15. # path = dataloc + r"/" + name
  16. # return pd.read_csv(path)
  17. #
  18. #
  19. # def saveData(name, data):
  20. # """
  21. # 存放数据
  22. # :param name: 名字
  23. # :param data: 数据
  24. # :return:
  25. # """
  26. # path = dataloc + r"/" + name
  27. # os.makedirs(os.path.dirname(path), exist_ok=True)
  28. # data.to_csv(path, index=False)
  29. def timestamp_to_datetime(ts):
  30. local_timezone = pytz.timezone('Asia/Shanghai')
  31. if type(ts) is not int:
  32. raise ValueError("timestamp-时间格式必须是整型")
  33. if len(str(ts)) == 13:
  34. dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
  35. return dt
  36. elif len(str(ts)) == 10:
  37. dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
  38. return dt
  39. else:
  40. raise ValueError("timestamp-时间格式错误")
  41. def timestr_to_timestamp(time_str):
  42. """
  43. 将时间戳或时间字符串转换为datetime.datetime类型
  44. :param time_data: int or str
  45. :return:datetime.datetime
  46. """
  47. if isinstance(time_str, str):
  48. if len(time_str) == 10:
  49. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
  50. return int(round(time.mktime(dt.timetuple())) * 1000)
  51. elif len(time_str) in {17, 18, 19}:
  52. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') # strptime字符串解析必须严格按照字符串中的格式
  53. return int(round(time.mktime(dt.timetuple())) * 1000) # 转换成毫秒级的时间戳
  54. else:
  55. raise ValueError("时间字符串长度不满足要求!")
  56. else:
  57. return time_str
  58. class DataBase(object):
  59. def __init__(self, arg):
  60. self.begin = datetime.datetime.strptime(arg.begin, '%Y-%m-%d')
  61. self.end = datetime.datetime.strptime(arg.end, '%Y-%m-%d') - pd.Timedelta(minutes=15)
  62. self.begin_stamp = timestr_to_timestamp(str(arg.begin))
  63. self.end_stamp = timestr_to_timestamp(str(self.end))
  64. self.database = arg.database
  65. self.towerloc = arg.towerloc
  66. self.turbineloc = arg.turbineloc
  67. self.dataloc = arg.dataloc
  68. def clear_data(self):
  69. """
  70. 删除所有csv
  71. :return:
  72. """
  73. # 设置文件夹路径
  74. import glob
  75. import os
  76. folder_path = self.dataloc
  77. # 使用 glob 获取所有的 .csv 文件路径
  78. csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
  79. # 遍历所有 .csv 文件并删除
  80. for file_path in csv_files:
  81. os.remove(file_path)
  82. print("清除所有csv文件")
  83. def create_database(self):
  84. """
  85. 创建数据库连接
  86. :param database: 数据库地址
  87. :return:
  88. """
  89. engine = create_engine(self.database)
  90. return engine
  91. def exec_sql(self, sql, engine):
  92. """
  93. 从数据库获取数据
  94. :param sql: sql语句
  95. :param engine: 数据库对象
  96. :return:
  97. """
  98. df = pd.read_sql_query(sql, engine)
  99. return df
  100. def split_time(self, data):
  101. data.set_index('C_TIME', inplace=True)
  102. data = data.sort_index().loc[self.begin: self.end]
  103. data.reset_index(drop=False, inplace=True)
  104. return data
  105. def get_process_NWP(self):
  106. """
  107. 从数据库中获取NWP数据,并进行简单处理
  108. :param database:
  109. :return:
  110. """
  111. # NPW数据
  112. engine = self.create_database()
  113. sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
  114. "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, " \
  115. "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
  116. "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" \
  117. " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 风的NWP字段
  118. NWP = self.exec_sql(sql_NWP, engine)
  119. NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
  120. NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
  121. NWP = cleaning(NWP, 'NWP')
  122. # NWP = self.split_time(NWP)
  123. NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  124. saveData("NWP.csv", NWP)
  125. return NWP
  126. def get_process_tower(self):
  127. """
  128. 获取环境检测仪数据
  129. :param database:
  130. :return:
  131. """
  132. engine = self.create_database()
  133. print("提取测风塔:{}".format(self.towerloc))
  134. for i in self.towerloc:
  135. # 删除没用的列
  136. drop_colmns = ["C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"]
  137. get_colmns = []
  138. # 查询表的所有列名
  139. result_set = self.exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine)
  140. for name in result_set.iloc[:, 0]:
  141. if name not in drop_colmns:
  142. get_colmns.append(name)
  143. all_columns_str = ", ".join([f'{col}' for col in get_colmns])
  144. tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end)
  145. tower = self.exec_sql(tower_sql, engine)
  146. tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
  147. saveData("/tower-{}.csv".format(i), tower)
  148. print("测风塔{}导出数据".format(i))
  149. def get_process_power(self):
  150. """
  151. 获取整体功率数据
  152. :param database:
  153. :return:
  154. """
  155. engine = self.create_database()
  156. sql_cap = "select C_CAPACITY from t_electric_field"
  157. cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
  158. sql_power = "select C_TIME,C_REAL_VALUE, C_ABLE_VALUE, C_IS_RATIONING_BY_MANUAL_CONTROL, C_IS_RATIONING_BY_AUTO_CONTROL" \
  159. " from t_power_station_status_data where C_TIME between '{}' and '{}'".format(self.begin, self.end)
  160. # if self.opt.usable_power["clean_power_by_signal"]:
  161. # sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and C_IS_RATIONING_BY_AUTO_CONTROL=0"
  162. powers = self.exec_sql(sql_power, engine)
  163. powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
  164. mask1 = powers['C_REAL_VALUE'].astype(float) > float(cap)
  165. mask = powers['C_REAL_VALUE'] == -99
  166. mask = mask | mask1
  167. print("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
  168. powers = powers[~mask]
  169. print("剔除完后还剩{}条".format(len(powers)))
  170. binary_map = {b'\x00': 0, b'\x01': 1}
  171. powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
  172. powers = rm_duplicated(powers)
  173. saveData("power.csv", powers)
  174. def get_process_dq(self):
  175. """
  176. 获取短期预测结果
  177. :param database:
  178. :return:
  179. """
  180. engine = self.create_database()
  181. sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
  182. "where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
  183. dq = self.exec_sql(sql_dq, engine)
  184. # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
  185. dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
  186. # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
  187. # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
  188. dq = cleaning(dq, 'dq', cols=['C_FP_VALUE'])
  189. dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  190. saveData("dq.csv", dq)
  191. def indep_process(self):
  192. """
  193. 进一步数据处理:时间统一处理等
  194. :return:
  195. """
  196. # 测风塔数据处理
  197. for i in self.towerloc:
  198. tower = readData("/tower-{}.csv".format(i))
  199. tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
  200. # 判断每一列是否全是 -99
  201. all_minus_99 = (tower == -99).all()
  202. # 获取全是 -99 的列的列名
  203. cols_to_drop = all_minus_99[all_minus_99 == True].index.tolist()
  204. # 使用 drop() 方法删除列
  205. tower = tower.drop(cols_to_drop, axis=1)
  206. tower = cleaning(tower, 'tower', ['C_WS_INST_HUB_HEIGHT'])
  207. saveData("/tower-{}-process.csv".format(i), tower)
  208. def get_process_cdq(self):
  209. """
  210. 获取超短期预测结果
  211. :param database:
  212. :return:
  213. """
  214. engine = self.create_database()
  215. sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from t_forecast_power_ultra_short_term_his" \
  216. " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
  217. cdq = self.exec_sql(sql_cdq, engine)
  218. cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
  219. cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False)
  220. # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
  221. cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  222. saveData("cdq.csv", cdq)
  223. def get_process_turbine(self):
  224. """
  225. 从数据库中获取风头数据,并进行简单处理
  226. :param database:
  227. :return:
  228. """
  229. for number in self.turbineloc:
  230. # number = self.opt.usable_power['turbine_id']
  231. # 机头数据
  232. engine = self.create_database()
  233. print("导出风机{}的数据".format(number))
  234. sql_turbine = "select C_TIME, C_WS, C_WD, C_ACTIVE_POWER from t_wind_turbine_status_data " \
  235. "WHERE C_EQUIPMENT_NO=" + str(number) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end) # + " and C_WS>0 and C_ACTIVE_POWER>0"
  236. turbine = self.exec_sql(sql_turbine, engine)
  237. turbine = cleaning(turbine, 'cdq', cols=['C_WS', 'C_ACTIVE_POWER'], dup=False)
  238. turbine['C_TIME'] = pd.to_datetime(turbine['C_TIME'])
  239. turbine = turbine[turbine['C_TIME'].dt.strftime('%M').isin(['00', '15', '30', '45'])]
  240. # 直接导出所有数据
  241. saveData("turbine-15/turbine-{}.csv".format(number), turbine)
  242. def data_process(self):
  243. """
  244. 数据导出+初步处理的总操控代码
  245. :param database:
  246. :return:
  247. """
  248. self.clear_data()
  249. try:
  250. self.get_process_power()
  251. self.get_process_dq()
  252. self.get_process_cdq()
  253. self.get_process_NWP()
  254. self.get_process_tower()
  255. # self.get_process_turbine()
  256. self.indep_process()
  257. except Exception as e:
  258. print("导出数据出错:{}".format(e.args))