inputData.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. import pandas as pd
  2. import datetime, time
  3. import re
  4. import os
  5. import pymysql
  6. from sqlalchemy import create_engine
  7. import pytz
  8. from data_cleaning import cleaning, rm_duplicated, key_field_row_cleaning
  9. current_path = os.path.dirname(__file__)
  10. dataloc = current_path + '/data/'
  11. def readData(name):
  12. """
  13. 读取数据
  14. :param name: 名字
  15. :return:
  16. """
  17. path = r"./cache/data/" + name
  18. return pd.read_csv(path)
  19. def saveData(name, data):
  20. """
  21. 存放数据
  22. :param name: 名字
  23. :param data: 数据
  24. :return:
  25. """
  26. path = r"./cache/data/" + name
  27. os.makedirs(os.path.dirname(path), exist_ok=True)
  28. data.to_csv(path, index=False)
  29. def timestamp_to_datetime(ts):
  30. local_timezone = pytz.timezone('Asia/Shanghai')
  31. if type(ts) is not int:
  32. raise ValueError("timestamp-时间格式必须是整型")
  33. if len(str(ts)) == 13:
  34. dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
  35. return dt
  36. elif len(str(ts)) == 10:
  37. dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
  38. return dt
  39. else:
  40. raise ValueError("timestamp-时间格式错误")
  41. def dt_tag(dt):
  42. date = dt.replace(hour=0, minute=0, second=0)
  43. delta = (dt - date) / pd.Timedelta(minutes=15)
  44. return delta + 1
  45. def timestr_to_timestamp(time_str):
  46. """
  47. 将时间戳或时间字符串转换为datetime.datetime类型
  48. :param time_data: int or str
  49. :return:datetime.datetime
  50. """
  51. if isinstance(time_str, str):
  52. if len(time_str) == 10:
  53. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
  54. return int(round(time.mktime(dt.timetuple())) * 1000)
  55. elif len(time_str) in {17, 18, 19}:
  56. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') # strptime字符串解析必须严格按照字符串中的格式
  57. return int(round(time.mktime(dt.timetuple())) * 1000) # 转换成毫秒级的时间戳
  58. else:
  59. raise ValueError("时间字符串长度不满足要求!")
  60. else:
  61. return time_str
  62. class DataBase(object):
  63. def __init__(self, begin, end, opt, logger):
  64. self.begin = begin
  65. self.opt = opt
  66. # self.his_begin = self.begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)
  67. # self.end = end + pd.Timedelta(days=1) - pd.Timedelta(minutes=15)
  68. # self.begin_stamp = timestr_to_timestamp(str(begin))
  69. # self.his_begin_stamp = timestr_to_timestamp(str(self.his_begin))
  70. # self.end_stamp = timestr_to_timestamp(str(self.end))
  71. self.database = opt.database
  72. self.logger = logger
  73. # self.towerloc = self.opt.tower
  74. def clear_data(self):
  75. """
  76. 删除所有csv
  77. :return:
  78. """
  79. # 设置文件夹路径
  80. import glob
  81. import os
  82. folder_path = dataloc
  83. # 使用 glob 获取所有的 .csv 文件路径
  84. csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
  85. # 遍历所有 .csv 文件并删除
  86. for file_path in csv_files:
  87. os.remove(file_path)
  88. self.logger.info("清除所有csv文件")
  89. def create_database(self):
  90. """
  91. 创建数据库连接
  92. :param database: 数据库地址
  93. :return:
  94. """
  95. engine = create_engine(self.database)
  96. return engine
  97. def exec_sql(self, sql, engine):
  98. """
  99. 从数据库获取数据
  100. :param sql: sql语句
  101. :param engine: 数据库对象
  102. :return:
  103. """
  104. df = pd.read_sql_query(sql, engine)
  105. return df
  106. def get_process_NWP(self):
  107. """
  108. 从数据库中获取NWP数据,并进行简单处理
  109. :param database:
  110. :return:
  111. """
  112. # NPW数据
  113. engine = self.create_database()
  114. sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
  115. "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, " \
  116. "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
  117. "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" \
  118. " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 风的NWP字段
  119. NWP = self.exec_sql(sql_NWP, engine)
  120. NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
  121. NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
  122. # NWP['DT_TAG'] = NWP.apply(lambda x: dt_tag(x['C_TIME']), axis=1)
  123. NWP = cleaning(NWP, 'NWP')
  124. # NWP = self.split_time(NWP)
  125. NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  126. saveData("NWP.csv", NWP)
  127. self.logger.info("导出nwp数据")
  128. return NWP
  129. def get_process_tower(self):
  130. """
  131. 获取环境检测仪数据
  132. :param database:
  133. :return:
  134. """
  135. engine = self.create_database()
  136. self.logger.info("提取测风塔:{}".format(self.opt.towerloc))
  137. for i in [self.opt.towerloc]:
  138. # 删除没用的列
  139. drop_colmns = ["C_ID","C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"]
  140. get_colmns = []
  141. # # 查询表的所有列名
  142. result_set = self.exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine)
  143. tower = pd.read_csv("./cache/data/t_wind_tower_status_data.csv")
  144. tower.columns = list(result_set['Field'].values)
  145. for name in result_set.iloc[:,0]:
  146. if name not in drop_colmns:
  147. get_colmns.append(name)
  148. all_columns_str = ", ".join([f'{col}' for col in get_colmns])
  149. # tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i) + " and C_TIME between '{}' and '{}'".format(self.his_begin, self.end)
  150. # tower = self.exec_sql(tower_sql, engine)
  151. tower = tower[all_columns_str.split(', ')]
  152. # tower.drop(columns=drop_colmns, inplace=True)
  153. tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
  154. saveData("/tower-{}.csv".format(i), tower)
  155. self.logger.info("测风塔{}导出数据".format(i))
  156. def get_process_power(self):
  157. """
  158. 获取整体功率数据
  159. :param database:
  160. :return:
  161. """
  162. powers = pd.read_csv('./cache/data/t_power_station_status_data.csv')
  163. shouzu = pd.read_csv('./cache/data/全场_原始数据_2024-11-23_16-22-42.csv')
  164. shouzu.rename(columns={'时间':'C_TIME', '场外受阻发电量': 'SHOUZU'}, inplace=True)
  165. shouzu['C_TIME'] = pd.to_datetime(shouzu['C_TIME'])
  166. shouzu = shouzu[~(shouzu['SHOUZU'] > 0)]
  167. engine = self.create_database()
  168. sql_cap = "select C_CAPACITY from t_electric_field"
  169. cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
  170. self.opt.cap = float(cap)
  171. result_set = self.exec_sql("SHOW COLUMNS FROM t_power_station_status_data", engine)
  172. powers.columns = list(result_set.iloc[:, 0].values)
  173. powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
  174. powers = pd.merge(powers, shouzu, on='C_TIME')
  175. powers = powers[['C_TIME', 'C_REAL_VALUE', 'C_ABLE_VALUE', 'C_IS_RATIONING_BY_MANUAL_CONTROL', 'C_IS_RATIONING_BY_AUTO_CONTROL']]
  176. # if self.opt.usable_power["clean_power_by_signal"]:
  177. # sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and C_IS_RATIONING_BY_AUTO_CONTROL=0"
  178. powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
  179. mask2 = powers['C_REAL_VALUE'] < 0
  180. mask1 = powers['C_REAL_VALUE'].astype(float) > float(cap)
  181. mask = powers['C_REAL_VALUE'] == -99
  182. mask = mask | mask1 | mask2
  183. self.logger.info("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
  184. powers = powers[~mask]
  185. self.logger.info("剔除完后还剩{}条".format(len(powers)))
  186. # binary_map = {b'\x00': 0, b'\x01': 1}
  187. # powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
  188. powers = rm_duplicated(powers)
  189. saveData("power_filter4.csv", powers)
  190. def get_process_dq(self):
  191. """
  192. 获取短期预测结果
  193. :param database:
  194. :return:
  195. """
  196. engine = self.create_database()
  197. sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
  198. "where C_FORECAST_TIME between {} and {}".format(self.his_begin_stamp, self.end_stamp)
  199. dq = self.exec_sql(sql_dq, engine)
  200. # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
  201. dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
  202. # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
  203. # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
  204. dq = cleaning(dq, 'dq', cols=['C_FP_VALUE'])
  205. dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  206. saveData("dq.csv", dq)
  207. def indep_process(self):
  208. """
  209. 进一步数据处理:时间统一处理等
  210. :return:
  211. """
  212. # 测风塔数据处理
  213. for i in [self.opt.towerloc]:
  214. tower = readData("/tower-{}.csv".format(i))
  215. tower = cleaning(tower, 'tower', [self.opt.usable_power["env"]])
  216. tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
  217. tower_ave = tower.resample('15T', on='C_TIME').mean().reset_index()
  218. tower_ave = tower_ave.dropna(subset=[self.opt.usable_power['env']])
  219. tower_ave.iloc[:, 1:] = tower_ave.iloc[:, 1:].round(2)
  220. saveData("/tower-{}-process.csv".format(i), tower_ave)
  221. def get_process_cdq(self):
  222. """
  223. 获取超短期预测结果
  224. :param database:
  225. :return:
  226. """
  227. engine = self.create_database()
  228. sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from t_forecast_power_ultra_short_term_his" \
  229. " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
  230. cdq = self.exec_sql(sql_cdq, engine)
  231. cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
  232. cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False)
  233. # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
  234. cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
  235. saveData("cdq.csv", cdq)
  236. def get_process_turbine(self):
  237. """
  238. 从数据库中获取风头数据,并进行简单处理
  239. :param database:
  240. :return:
  241. """
  242. cids = [x for x in range(33, 65, 1)]
  243. c_names = ['F01', 'F02', 'F03', 'F04', 'F05', 'F06', 'F07', 'F08', 'F09', 'F10']+['F'+str(x) for x in range(10, 32)]
  244. id_names = {id: c_names[x] for x, id in enumerate(cids)}
  245. turbines = pd.read_csv('./cache/data/turbines.csv')
  246. turbines_ori = pd.read_csv('./cache/data/全场_原始数据_2024-08-01_19-40-25.csv')
  247. turbines_ori['风机'] = turbines_ori['风机'].apply(lambda x: re.sub(r'\(.*?\)|\[.*?\]|\{.*?\}', '', x))
  248. turbines_ori.rename(columns={"时间": "C_TIME"}, inplace=True)
  249. turbines_ori = turbines_ori[['C_TIME', '风机', '低位首触机组运行状态', '高位首触机组运行状态']]
  250. turbines_ori['C_TIME'] = pd.to_datetime(turbines_ori['C_TIME'])
  251. turbines['C_TIME'] = pd.to_datetime(turbines['C_TIME'])
  252. for number in self.opt.turbineloc:
  253. # number = self.opt.usable_power['turbine_id']
  254. # 机头数据
  255. turbine = turbines[turbines['C_EQUIPMENT_NO'] == number]
  256. turbine_ori = turbines_ori[(turbines_ori['风机'] == id_names[number]) & (turbines_ori['低位首触机组运行状态'] <=3) & (turbines_ori['高位首触机组运行状态'] <=3)]
  257. turbine = pd.merge(turbine, turbine_ori.loc[:, ["C_TIME", "风机"]], on="C_TIME")
  258. turbine.drop(columns=['风机'], inplace=True)
  259. turbine = key_field_row_cleaning(turbine, cols=['C_WS', 'C_ACTIVE_POWER'])
  260. turbine = turbine[turbine['C_TIME'].dt.strftime('%M').isin(['00', '15', '30', '45'])]
  261. # 直接导出所有数据
  262. saveData("turbine-{}.csv".format(number), turbine)
  263. def process_csv_files(self, input_dir, output_dir, M, N): # MBD:没有考虑时间重复
  264. if not os.path.exists(output_dir):
  265. os.makedirs(output_dir)
  266. for i in self.opt.turbineloc:
  267. input_file = os.path.join(input_dir, f"turbine-{i}.csv")
  268. output_file = os.path.join(output_dir, f"turbine-{i}.csv")
  269. # 读取csv文件
  270. df = pd.read_csv(input_file)
  271. # 剔除异常值,并获取异常值统计信息
  272. df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = self.remove_abnormal_values(df, N)
  273. # 输出异常值统计信息
  274. self.logger.info(f"处理文件:{input_file}")
  275. self.logger.info(f"剔除 -99 点异常值数量:{count_abnormal1}")
  276. self.logger.info(f"剔除连续异常值数量:{count_abnormal2}")
  277. self.logger.info(f"总共剔除数据量:{total_removed}")
  278. self.logger.info(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
  279. # 保存处理过的CSV文件
  280. df_clean.to_csv(output_file, index=False)
  281. def remove_abnormal_values(self,df, N):
  282. # 标记C_ACTIVE_POWER为-99的行为异常值
  283. abnormal_mask1 = df['C_ACTIVE_POWER'] == -99
  284. count_abnormal1 = abnormal_mask1.sum()
  285. # 标记C_WS, A, B连续5行不变的行为异常值
  286. columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER']
  287. abnormal_mask2 = self.mark_abnormal_streaks(df, columns, N)
  288. count_abnormal2 = abnormal_mask2.sum()
  289. # 获得所有异常值的布尔掩码
  290. abnormal_mask = abnormal_mask1 | abnormal_mask2
  291. # 获取连续异常值具体数值
  292. removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns}
  293. # 剔除异常值
  294. df_clean = df[~abnormal_mask]
  295. total_removed = abnormal_mask.sum()
  296. return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
  297. # ——————————————————————————对分区的风机进行发电功率累加——————————————————————————————
  298. def zone_powers(self, input_dir):
  299. z_power = {}
  300. for zone, turbines in self.opt.zone.items():
  301. dfs = [pd.read_csv(os.path.join(input_dir, f"turbine-{z}.csv")) for z in self.opt.turbineloc if z in turbines]
  302. z_power['C_TIME'] = dfs[0]['C_TIME']
  303. sum_power = pd.concat([df['C_ACTIVE_POWER'] for df in dfs], ignore_index=True, axis=1).sum(axis=1)
  304. z_power[zone] = sum_power
  305. z_power = pd.DataFrame(z_power)
  306. z_power.iloc[:, 1:] = z_power.iloc[:, 1:].round(2)
  307. saveData("z-power-t.csv", z_power)
  308. # ——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
  309. def mark_abnormal_streaks(self, df, columns, min_streak):
  310. abnormal_mask = pd.Series(False, index=df.index)
  311. streak_start = None
  312. for i in range(len(df)):
  313. if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns):
  314. streak_start = i
  315. if i - streak_start >= min_streak - 1:
  316. abnormal_mask[i - min_streak + 1:i + 1] = True
  317. return abnormal_mask
  318. # ——————————————————————————风机单机时间对齐——————————————————————————————
  319. def TimeMerge(self, input_dir, output_dir, M):
  320. # 读取所有CSV文件
  321. files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in self.opt.turbineloc]
  322. dataframes = [pd.read_csv(f) for f in files]
  323. # 获取C_TIME列的交集
  324. c_time_intersection = set(dataframes[0]["C_TIME"])
  325. for df in dataframes[1:]:
  326. c_time_intersection.intersection_update(df["C_TIME"])
  327. # 只保留C_TIME交集中的数据
  328. filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes]
  329. # 将每个过滤后的DataFrame写入新的CSV文件
  330. os.makedirs(output_dir, exist_ok=True)
  331. for (filtered_df, i) in zip(filtered_dataframes, self.opt.turbineloc):
  332. if i == 144:
  333. filtered_df['C_ACTIVE_POWER'] /= 1000
  334. filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
  335. def data_process(self):
  336. """
  337. 数据导出+初步处理的总操控代码
  338. :param database:
  339. :return:
  340. """
  341. # self.clear_data()
  342. self.get_process_power()
  343. # self.get_process_dq()
  344. # self.get_process_cdq()
  345. # self.get_process_NWP()
  346. # self.get_process_tower()
  347. # self.indep_process()
  348. self.get_process_turbine()
  349. self.process_csv_files('./cache/data', './cache/data', 50, 5)
  350. self.TimeMerge('./cache/data', './cache/data', 50)
  351. self.zone_powers('./cache/data')
  352. if __name__ == '__main__':
  353. from logs import Log
  354. from config import myargparse
  355. import matplotlib.pyplot as plt
  356. import matplotlib.colors as mcolors
  357. import pandas as pd
  358. args = myargparse(discription="场站端配置", add_help=False)
  359. opt = args.parse_args_and_yaml()
  360. log = Log().logger
  361. db = DataBase(begin='', end='', opt=opt, logger=log)
  362. db.data_process()