inputData.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. import pymysql
  2. import pandas as pd
  3. import numpy as np
  4. from sqlalchemy import create_engine
  5. import matplotlib.pyplot as plt
  6. import pytz
  7. plt.rcParams['font.sans-serif'] = ['SimHei']
  8. import utils.savedata
  9. from utils import Arg
  10. from norm import Normalize
  11. arg = Arg.Arg()
  12. norm = Normalize()
  13. def clear_data():
  14. """
  15. 删除所有csv
  16. :return:
  17. """
  18. # 设置文件夹路径
  19. import glob
  20. import os
  21. folder_path = arg.dataloc
  22. # 使用 glob 获取所有的 .csv 文件路径
  23. csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
  24. # 遍历所有 .csv 文件并删除
  25. for file_path in csv_files:
  26. os.remove(file_path)
  27. print("清除所有scv文件")
  28. def create_database(database):
  29. """
  30. 创建数据库连接
  31. :param database: 数据库地址
  32. :return:
  33. """
  34. engine = create_engine(database)
  35. return engine
  36. def exec_sql(sql,engine):
  37. """
  38. 从数据库获取数据
  39. :param sql: sql语句
  40. :param engine: 数据库对象
  41. :return:
  42. """
  43. df = pd.read_sql_query(sql, engine)
  44. return df
  45. def get_process_NWP(database):
  46. """
  47. 从数据库中获取NWP数据,并进行简单处理
  48. :param database:
  49. :return:
  50. """
  51. # NPW数据
  52. engine = create_database(database)
  53. sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR, C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" # 光的NWP字段
  54. NWP = exec_sql(sql_NWP, engine)
  55. #删除后三位
  56. NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(str)
  57. NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].str[:-3]
  58. # 将 'timestamp' 列转换为日期时间格式
  59. NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(float)
  60. NWP['C_PRE_TIME'] = pd.to_datetime(NWP['C_PRE_TIME'], unit='s')
  61. # 将日期时间转换为本地时区
  62. NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.tz_localize(pytz.utc).dt.tz_convert('Asia/Shanghai')
  63. # 格式化日期时间为年月日时分秒
  64. NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  65. NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
  66. utils.savedata.saveData("NWP.csv",NWP)
  67. norm.normalize(NWP)
  68. return NWP
  69. def get_process_weather(database):
  70. """
  71. 获取环境检测仪数据
  72. :param database:
  73. :return:
  74. """
  75. engine = create_database(database)
  76. print("现有环境监测仪:{}".format(arg.weatherloc))
  77. for i in arg.weatherloc:
  78. print("环境监测仪{}导出数据".format(i))
  79. # 删除没用的列
  80. drop_colmns = ["C_ID", "C_EQUIPMENT_NO", "C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10", "C_STATUS", "C_IS_GENERATED","C_ABNORMAL_CODE"]
  81. get_colmns = []
  82. # 查询表的所有列名
  83. result_set = exec_sql("SHOW COLUMNS FROM t_weather_station_status_data", engine)
  84. for name in result_set.iloc[:,0]:
  85. if name not in drop_colmns:
  86. get_colmns.append(name)
  87. all_columns_str = ", ".join([f'{col}' for col in get_colmns])
  88. weather_sql = "select " + all_columns_str + " from t_weather_station_status_data where C_EQUIPMENT_NO="+str(i)
  89. weather = exec_sql(weather_sql, engine)
  90. utils.savedata.saveData("weather/weather-{}.csv".format(i), weather)
  91. norm.normalize(weather)
  92. def get_process_power(database):
  93. """
  94. 获取整体功率数据
  95. :param database:
  96. :return:
  97. """
  98. engine = create_database(database)
  99. sql_cap = "select C_CAPACITY from t_electric_field"
  100. cap = exec_sql(sql_cap, engine)['C_CAPACITY']
  101. sql_power = "select C_TIME,C_REAL_VALUE from t_power_station_status_data"
  102. powers = exec_sql(sql_power, engine)
  103. mask1 = powers['C_REAL_VALUE'] > float(cap)
  104. mask = powers['C_REAL_VALUE'] == -99
  105. mask = mask | mask1
  106. print("要剔除功率有{}条".format(mask.sum()))
  107. powers = powers[~mask]
  108. utils.savedata.saveData("power.csv", powers)
  109. power5, power_index = [], [0] # 功率表,索引表
  110. ps = 0
  111. # 获取5分钟一个间隔的功率数据
  112. for i, power in powers.iterrows():
  113. real_value = power['C_REAL_VALUE']
  114. ps += real_value
  115. if str(power['C_TIME'].minute)[-1] in ('0', '5'):
  116. power_index.append(i)
  117. num = power_index[-1] - power_index[-2]
  118. num = num if num != 0 else 1
  119. psa = round(ps / num, 2)
  120. power5.append([power['C_TIME'], psa])
  121. ps = 0
  122. power5 = pd.DataFrame(power5, columns=['C_TIME', 'C_REAL_VALUE'])
  123. utils.savedata.saveData("power5.csv", power5)
  124. norm.normalize(power5)
  125. def get_process_dq(database):
  126. """
  127. 获取短期预测结果
  128. :param database:
  129. :return:
  130. """
  131. engine = create_database(database)
  132. sql_dq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE from t_forecast_power_short_term_his"
  133. dq = exec_sql(sql_dq, engine)
  134. dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
  135. utils.savedata.saveData("dq.csv", dq)
  136. def get_process_cdq(database):
  137. """
  138. 获取超短期预测结果
  139. :param database:
  140. :return:
  141. """
  142. engine = create_database(database)
  143. sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE from t_forecast_power_ultra_short_term_his"
  144. cdq = exec_sql(sql_cdq, engine)
  145. cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME'], unit='ms')
  146. utils.savedata.saveData("cdq.csv", cdq)
  147. def indep_process():
  148. """
  149. 进一步数据处理:时间统一处理等
  150. :return:
  151. """
  152. # 环境监测仪数据处理
  153. for i in arg.weatherloc:
  154. weather = utils.savedata.readData("/weather/weather-{}.csv".format(i))
  155. # 判断每一列是否全是 -99
  156. all_minus_99 = (weather == -99).all()
  157. # 获取全是 -99 的列的列名
  158. cols_to_drop = all_minus_99[all_minus_99 == True].index.tolist()
  159. # 使用 drop() 方法删除列
  160. weather = weather.drop(cols_to_drop, axis=1)
  161. # MBD: 将一部分是-99的列删除,把-99替换为nan
  162. weather_nan = weather.replace(-99, np.nan, inplace=False)
  163. # nan 超过80% 删除
  164. weather = weather.dropna(axis=1, thresh=len(weather_nan) * 0.8)
  165. weather = weather.replace(np.nan, -99, inplace=False)
  166. # 删除取值全部相同的列
  167. weather = weather.loc[:, (weather != weather.iloc[0]).any()]
  168. utils.savedata.saveData("/weather/weather-{}-process.csv".format(i), weather)
  169. # 时间统一
  170. weather1 = utils.savedata.readData("/weather/weather-{}-process.csv".format(1))
  171. # tower2 = utils.savedata.readData("/tower/tower-{}-process.csv".format(2))
  172. # tower1 = tower1[tower1['C_TIME'].isin(tower2['C_TIME'])]
  173. # tower2 = tower2[tower2['C_TIME'].isin(tower1['C_TIME'])]
  174. utils.savedata.saveData("/weather/weather-{}-process.csv".format(1), weather1)
  175. # utils.savedata.saveData("/tower/tower-{}-process.csv".format(2), tower2)
  176. # 所有表时间统一
  177. filenames = ["/NWP.csv","/power.csv", "power5.csv", "/dq.csv", "/cdq.csv", '/weather/weather-1-process.csv']
  178. dataframes = []
  179. for name in filenames:
  180. dataframes.append(utils.savedata.readData(name))
  181. # 查找最大起始时间和最小结束时间
  182. max_start_time = max(df['C_TIME'].min() for df in dataframes)
  183. min_end_time = min(df['C_TIME'].max() for df in dataframes)
  184. print(max_start_time)
  185. print(min_end_time)
  186. # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据
  187. for i, df in enumerate(dataframes):
  188. df['C_TIME'] = pd.to_datetime(df['C_TIME']) # 确保时间列是 datetime 类型
  189. df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)]
  190. # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀
  191. utils.savedata.saveData(filenames[i],df_filtered)
  192. def NWP_indep_process():
  193. """
  194. 将NWP数据按照缺失值数量划分为N个不同数据集
  195. :return:
  196. """
  197. # NWP数据进一步处理
  198. NWP = utils.savedata.readData("NWP.csv")
  199. df = pd.to_datetime(NWP['C_TIME'])
  200. time_diff = df.diff()
  201. time_diff_threshold = pd.Timedelta(minutes=15)
  202. missing_values = df[time_diff > time_diff_threshold]
  203. print("NWP数据缺失的数量为:{}".format(len(missing_values)))
  204. print(missing_values)
  205. # 文件保存
  206. utils.savedata.saveVar("NWP_miss.pickle", missing_values)
  207. split_indices = []
  208. for i in range(len(missing_values)):
  209. if i == 0:
  210. split_indices.append((0, missing_values.index[i]))
  211. else:
  212. split_indices.append((missing_values.index[i - 1], missing_values.index[i]))
  213. split_indices.append((missing_values.index[-1], len(df))) # MBD: 分割少了一个点
  214. split_datasets = [NWP.iloc[start:end,:] for start, end in split_indices]
  215. for i, split_df in enumerate(split_datasets):
  216. utils.savedata.saveData("Dataset_training/NWP/NWP_{}.csv".format(i),split_df)
  217. return split_datasets
  218. # def power_indep_process():
  219. # NWP = utils.savedata.readData("power.csv")
  220. def Data_split():
  221. """
  222. 这个函数没用上,可以不看
  223. :return:
  224. """
  225. NWP = utils.savedata.readData("power_15min.csv")
  226. df = pd.to_datetime(NWP['C_TIME'])
  227. time_diff = df.diff()
  228. time_diff_threshold = pd.Timedelta(minutes=15)
  229. missing_values = df[time_diff > time_diff_threshold]
  230. print("NWP数据缺失的数量为:{}".format(len(missing_values)))
  231. print(missing_values)
  232. NWP_miss = utils.savedata.readVar("NWP_miss.pickle")
  233. for t in missing_values.index:
  234. a = t-1
  235. b = t
  236. time1 = NWP['C_TIME'][a]
  237. time2 = NWP['C_TIME'][b]
  238. df = pd.to_datetime([time1, time2])
  239. # 计算时间差
  240. time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15)
  241. print(time_diff)
  242. time1 = "2022-10-27 14:00:00"
  243. time2 = "2023-04-16 12:00:00"
  244. df = pd.to_datetime([time1, time2])
  245. # 计算时间差
  246. time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15)
  247. print(time_diff)
  248. def time_all_in():
  249. """
  250. 这个函数暂时没用上,这个函数的目的是给机头数据进行填充,找到时间缺失的位置,填充为-99
  251. :return:
  252. """
  253. filenames = []
  254. dataframes = []
  255. for i in arg.turbineloc:
  256. filenames.append("/turbine-15/turbine-{}.csv".format(i))
  257. for name in filenames:
  258. dataframes.append(utils.savedata.readData(name))
  259. for df in dataframes:
  260. df['C_TIME'] = pd.to_datetime(df['C_TIME'])
  261. # 创建一个完整的时间序列索引,包括所有可能的时间点
  262. start_time = df['C_TIME'].min()
  263. end_time = df['C_TIME'].max()
  264. full_time_range = pd.date_range(start_time, end_time, freq='15min')
  265. # 使用完整的时间序列索引创建一个空的 DataFrame
  266. full_df = pd.DataFrame(index=full_time_range)
  267. full_df.index.name = 'C_TIME'
  268. # 将原始数据与空的 DataFrame 合并
  269. merged_df = full_df.merge(df, how='left', left_on='time', right_on='time')
  270. # 使用 -99 填充缺失值,除了时间列
  271. merged_df.fillna(-99, inplace=True)
  272. merged_df.reset_index(inplace=True)
  273. def data_process(database):
  274. """
  275. 数据导出+初步处理的总操控代码
  276. :param database:
  277. :return:
  278. """
  279. clear_data()
  280. get_process_power(database)
  281. get_process_dq(database)
  282. get_process_cdq(database)
  283. get_process_NWP(database)
  284. get_process_weather(database)
  285. indep_process()
  286. NWP_indep_process()
  287. norm.save_yml({'mean': norm.mean, 'std': norm.std}, arg.normloc)
  288. if __name__ == '__main__':
  289. import os
  290. import glob
  291. # 设置文件夹路径
  292. folder_path = '../data'
  293. # 使用 glob 获取所有的 .csv 文件
  294. csv_files = glob.glob(os.path.join(folder_path, '*.csv'))
  295. # 遍历所有 .csv 文件并删除
  296. for file_path in csv_files:
  297. os.remove(file_path)
  298. # database = "mysql+pymysql://root:!QAZ2root@192.168.1.205:3306/ipfcst-sishui-a"
  299. # engine = create_database(database)
  300. #
  301. # # NPW数据
  302. # sql_NWP = "select C_SC_DATE,C_SC_TIME,C_T,C_RH,C_PRESSURE,C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp"
  303. # NWP = exec_sql(sql_NWP,engine)
  304. #
  305. # # 分风机功率
  306. # sql_wind = "select C_WS,C_ACTIVE_POWER from t_wind_turbine_status_data_15 WHERE C_EQUIPMENT_NO=2 and C_WS>0 and C_ACTIVE_POWER>0 "
  307. # df_wind = exec_sql(sql_wind,engine)
  308. # print(df_wind)
  309. # 总功率数据读取
  310. # sql_power = "select * from t_power_station_status_data"
  311. # df_power = exec_sql(sql_power, engine)
  312. filenames = []
  313. dataframes = []
  314. for i in arg.turbineloc:
  315. filenames.append("../data/turbine-15/turbine-{}.csv".format(i))
  316. for name in filenames:
  317. dataframes.append(pd.read_csv(name).iloc[:7000,:])
  318. # for df in enumerate(dataframes):
  319. # df =
  320. mean_of_first_columns = pd.concat([df['C_WS'] for df in dataframes], axis=1).mean(axis=1)
  321. mean_of_second_columns = (pd.concat([df['C_ACTIVE_POWER'] for df in dataframes], axis=1).sum(axis=1)/1000).astype(int)
  322. print(len(mean_of_first_columns))
  323. plt.scatter(mean_of_first_columns, mean_of_second_columns)
  324. plt.show()