inputData.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import pandas as pd
  2. import datetime, time
  3. import re
  4. import os
  5. import argparse
  6. from sqlalchemy import create_engine
  7. import pytz
  8. from data_cleaning import cleaning, rm_duplicated, key_field_row_cleaning
  9. # current_path = os.path.dirname(__file__)
  10. # dataloc = current_path + '/data/'
  11. station_id = '260'
  12. def readData(name):
  13. """
  14. 读取数据
  15. :param name: 名字
  16. :return:
  17. """
  18. path = rf"../../cluster/{station_id}/" + name
  19. return pd.read_csv(path)
  20. def saveData(name, data):
  21. """
  22. 存放数据
  23. :param name: 名字
  24. :param data: 数据
  25. :return:
  26. """
  27. path = rf"../../cluster/{station_id}/" + name
  28. os.makedirs(os.path.dirname(path), exist_ok=True)
  29. data.to_csv(path, index=False)
  30. def timestamp_to_datetime(ts):
  31. local_timezone = pytz.timezone('Asia/Shanghai')
  32. if type(ts) is not int:
  33. raise ValueError("timestamp-时间格式必须是整型")
  34. if len(str(ts)) == 13:
  35. dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
  36. return dt
  37. elif len(str(ts)) == 10:
  38. dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
  39. return dt
  40. else:
  41. raise ValueError("timestamp-时间格式错误")
  42. def dt_tag(dt):
  43. date = dt.replace(hour=0, minute=0, second=0)
  44. delta = (dt - date) / pd.Timedelta(minutes=15)
  45. return delta + 1
  46. def timestr_to_timestamp(time_str):
  47. """
  48. 将时间戳或时间字符串转换为datetime.datetime类型
  49. :param time_data: int or str
  50. :return:datetime.datetime
  51. """
  52. if isinstance(time_str, str):
  53. if len(time_str) == 10:
  54. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
  55. return int(round(time.mktime(dt.timetuple())) * 1000)
  56. elif len(time_str) in {17, 18, 19}:
  57. dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') # strptime字符串解析必须严格按照字符串中的格式
  58. return int(round(time.mktime(dt.timetuple())) * 1000) # 转换成毫秒级的时间戳
  59. else:
  60. raise ValueError("时间字符串长度不满足要求!")
  61. else:
  62. return time_str
  63. class DataBase(object):
  64. def __init__(self, begin, end, opt):
  65. self.begin = begin
  66. self.end = end
  67. self.opt = opt
  68. self.begin_stamp = timestr_to_timestamp(str(begin))
  69. self.end_stamp = timestr_to_timestamp(str(end))
  70. self.database = opt.database
  71. self.dataloc = opt.dataloc
  72. def clear_data(self):
  73. """
  74. 删除所有csv
  75. :return:
  76. """
  77. # 设置文件夹路径
  78. import glob
  79. import os
  80. folder_path = self.dataloc
  81. # 使用 glob 获取所有的 .csv 文件路径
  82. csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
  83. # 遍历所有 .csv 文件并删除
  84. for file_path in csv_files:
  85. os.remove(file_path)
  86. print("清除所有csv文件")
  87. def create_database(self):
  88. """
  89. 创建数据库连接
  90. :param database: 数据库地址
  91. :return:
  92. """
  93. engine = create_engine(self.database)
  94. return engine
  95. def exec_sql(self, sql, engine):
  96. """
  97. 从数据库获取数据
  98. :param sql: sql语句
  99. :param engine: 数据库对象
  100. :return:
  101. """
  102. df = pd.read_sql_query(sql, engine)
  103. return df
  104. def get_process_power(self):
  105. """
  106. 获取整体功率数据
  107. :param database:
  108. :return:
  109. """
  110. engine = self.create_database()
  111. sql_cap = "select C_CAPACITY from t_electric_field"
  112. cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
  113. sql_power = "select C_TIME,C_REAL_VALUE, C_ABLE_VALUE, C_IS_RATIONING_BY_MANUAL_CONTROL, C_IS_RATIONING_BY_AUTO_CONTROL" \
  114. " from t_power_station_status_data where C_TIME between '{}' and '{}'".format(self.begin, self.end)
  115. powers = self.exec_sql(sql_power, engine)
  116. powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
  117. mask1 = powers['C_REAL_VALUE'].astype(float) > float(cap)
  118. mask = powers['C_REAL_VALUE'] == -99
  119. mask = mask | mask1
  120. print("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
  121. powers = powers[~mask]
  122. print("剔除完后还剩{}条".format(len(powers)))
  123. binary_map = {b'\x00': 0, b'\x01': 1}
  124. powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
  125. powers = rm_duplicated(powers)
  126. saveData("power.csv", powers)
  127. def get_process_turbine(self, output_dir):
  128. """
  129. 从数据库中获取风头数据,并进行简单处理
  130. :param database:
  131. :return:
  132. """
  133. for number in self.opt.turbineloc:
  134. # 机头数据
  135. engine = self.create_database()
  136. print("导出风机{}的数据".format(number))
  137. sql_turbine = "select C_TIME, C_WS, C_WD, C_ACTIVE_POWER from t_wind_turbine_status_data " \
  138. "WHERE C_EQUIPMENT_NO=" + str(number) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end) # + " and C_WS>0 and C_ACTIVE_POWER>0"
  139. turbine = self.exec_sql(sql_turbine, engine)
  140. turbine = cleaning(turbine, 'turbine', cols=['C_WS', 'C_ACTIVE_POWER'], dup=False)
  141. turbine['C_TIME'] = pd.to_datetime(turbine['C_TIME'])
  142. turbine = turbine[turbine['C_TIME'].dt.strftime('%M').isin(['00', '15', '30', '45'])]
  143. # 直接导出所有数据
  144. output_file = os.path.join(output_dir, f"turbine-{number}.csv")
  145. saveData(output_file, turbine)
  146. def process_csv_files(self, input_dir, output_dir, M, N): # MBD:没有考虑时间重复
  147. if not os.path.exists(output_dir):
  148. os.makedirs(output_dir)
  149. for i in self.opt.turbineloc:
  150. input_file = os.path.join(input_dir, f"turbine-{i}.csv")
  151. output_file = os.path.join(output_dir, f"turbine-{i}.csv")
  152. # 读取csv文件
  153. df = pd.read_csv(input_file)
  154. # 剔除异常值,并获取异常值统计信息
  155. df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = self.remove_abnormal_values(df, N)
  156. # 输出异常值统计信息
  157. print(f"处理文件:{input_file}")
  158. print(f"剔除 -99 点异常值数量:{count_abnormal1}")
  159. print(f"剔除连续异常值数量:{count_abnormal2}")
  160. print(f"总共剔除数据量:{total_removed}")
  161. print(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
  162. # 保存处理过的CSV文件
  163. df_clean.to_csv(output_file, index=False)
  164. def remove_abnormal_values(self,df, N):
  165. # 标记C_ACTIVE_POWER为-99的行为异常值
  166. abnormal_mask1 = df['C_ACTIVE_POWER'] == -99
  167. count_abnormal1 = abnormal_mask1.sum()
  168. # 标记C_WS, A, B连续5行不变的行为异常值
  169. columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER']
  170. abnormal_mask2 = self.mark_abnormal_streaks(df, columns, N)
  171. count_abnormal2 = abnormal_mask2.sum()
  172. # 获得所有异常值的布尔掩码
  173. abnormal_mask = abnormal_mask1 | abnormal_mask2
  174. # 获取连续异常值具体数值
  175. removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns}
  176. # 剔除异常值
  177. df_clean = df[~abnormal_mask]
  178. total_removed = abnormal_mask.sum()
  179. return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
  180. # ——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
  181. def mark_abnormal_streaks(self, df, columns, min_streak):
  182. abnormal_mask = pd.Series(False, index=df.index)
  183. streak_start = None
  184. for i in range(len(df)):
  185. if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns):
  186. streak_start = i
  187. if i - streak_start >= min_streak - 1:
  188. abnormal_mask[i - min_streak + 1:i + 1] = True
  189. return abnormal_mask
  190. # ——————————————————————————风机单机时间对齐——————————————————————————————
  191. def TimeMerge(self, input_dir, output_dir, M):
  192. # 读取所有CSV文件
  193. files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in self.opt.turbineloc]
  194. dataframes = [pd.read_csv(f) for f in files]
  195. # 获取C_TIME列的交集
  196. c_time_intersection = set(dataframes[0]["C_TIME"])
  197. for df in dataframes[1:]:
  198. c_time_intersection.intersection_update(df["C_TIME"])
  199. # 只保留C_TIME交集中的数据
  200. filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes]
  201. # 将每个过滤后的DataFrame写入新的CSV文件
  202. os.makedirs(output_dir, exist_ok=True)
  203. for (filtered_df, i) in zip(filtered_dataframes, self.opt.turbineloc):
  204. if i == 144:
  205. filtered_df['C_ACTIVE_POWER'] /= 1000
  206. filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
  207. def data_process(self):
  208. """
  209. 数据导出+初步处理的总操控代码
  210. :param database:
  211. :return:
  212. """
  213. self.clear_data()
  214. self.get_process_power()
  215. self.get_process_turbine(f'../../cluster/{station_id}')
  216. self.process_csv_files(f'../../cluster/{station_id}', f'../../cluster/{station_id}', 50, 5)
  217. self.TimeMerge(f'../../cluster/{station_id}', f'../../cluster/{station_id}', 50)
  218. # self.zone_powers('../cluster/data')
  219. if __name__ == '__main__':
  220. import pandas as pd
  221. turbineloc = [x for x in range(76, 151, 1)]
  222. c_names = ['G01', 'G02', 'G03', 'G04', 'G05', 'G06', 'G07', 'G08', 'G09', 'G10'] + ['G' + str(x) for x in range(11, 76)]
  223. id_names = {id: c_names[x] for x, id in enumerate(turbineloc)}
  224. args = {'database': 'mysql+pymysql://root:mysql_T7yN3E@192.168.12.10:19306/ipfcst_j00260_20250507161106',
  225. 'cap': 225,
  226. 'id_name': id_names,
  227. 'turbineloc': turbineloc,
  228. 'dataloc': '../cluster/data'}
  229. opt = argparse.Namespace(**args)
  230. db = DataBase(begin='2025-01-01', end='2025-05-01', opt=opt)
  231. db.data_process()