data_nwp_ftp.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :data_nwp_ftp.py
  4. # @Time :2024/12/26 08:38
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import requests, json
  8. from datetime import timedelta
  9. from ftplib import FTP
  10. import threading, glob, pytz
  11. import pandas as pd
  12. from pytz import timezone
  13. from flask import Flask,request,jsonify
  14. import time, datetime, os, traceback, re
  15. import zipfile, tempfile, shutil, fnmatch
  16. from common.database_dml import insert_data_into_mongo
  17. from apscheduler.schedulers.background import BackgroundScheduler
  18. from apscheduler.triggers.cron import CronTrigger
  19. from ftplib import error_temp, error_perm
  20. from socket import error as socket_error
  21. from common.logs import Log
  22. logger = Log('data-processing').logger
  23. app = Flask('data_nwp_ftp——service')
  24. def update_thread():
  25. thread = threading.Thread(target=start_jobs)
  26. thread.start()
  27. def start_jobs():
  28. scheduler = BackgroundScheduler()
  29. scheduler.configure({'timezone': timezone("Asia/Shanghai")})
  30. trigger1 = CronTrigger(
  31. hour='0,1,2,4,5,6,11,12,13,16,17,18', # 主触发小时
  32. minute='0-50/10', # 每 10 分钟一次,覆盖 00:00-00:50
  33. timezone='Asia/Shanghai' # 按需设置时区
  34. )
  35. scheduler.add_job(func=download_zip_files_from_ftp, trigger=trigger1)
  36. scheduler.start()
  37. def match_date(date, filename):
  38. given_date = datetime.datetime.strptime(date, '%Y%m%d')
  39. date_pattern = re.compile(r'(\d{8})')
  40. match = date_pattern.search(filename)
  41. if match:
  42. filename_str = match.group(0)
  43. filename_date = datetime.datetime.strptime(filename_str, '%Y%m%d')
  44. if filename_date <= given_date:
  45. return True
  46. def delete_zip_files(date):
  47. xxl_path = ftp_params['xxl']['local_dir']
  48. # 遍历文件夹中的所有文件
  49. for root, dirs, files in os.walk(xxl_path):
  50. for filename in files:
  51. # 检查文件名是否以 'meteo_date_' 开头且以 '.zip' 结尾
  52. if fnmatch.fnmatch(filename, f'*.zip') and match_date(date, filename):
  53. # 构建文件的完整路径
  54. file_path = os.path.join(xxl_path, filename)
  55. # 删除文件
  56. try:
  57. os.remove(file_path)
  58. print(f"Deleted file: {file_path}")
  59. except OSError as e:
  60. print(f"Error deleting file {file_path}: {e.strerror}")
  61. for farmId in dirs:
  62. target_dir_path = os.path.join(root, farmId)
  63. for file_name in os.listdir(target_dir_path):
  64. csv_file_path = os.path.join(target_dir_path, file_name)
  65. if fnmatch.fnmatch(csv_file_path, f'*.csv') and match_date(date, file_name):
  66. try:
  67. os.remove(csv_file_path)
  68. print(f"Deleted file: {csv_file_path}")
  69. except OSError as e:
  70. print(f"Error deleting file {csv_file_path}: {e.strerror}")
  71. def get_moment_next(schedule_dt=None):
  72. if schedule_dt is not None:
  73. now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
  74. else:
  75. now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
  76. date = now.strftime('%Y%m%d')
  77. if now.hour == 18:
  78. moment = '18'
  79. elif now.hour > 18:
  80. moment = '00'
  81. elif now.hour == 12:
  82. moment = '12'
  83. elif now.hour > 12:
  84. moment = '18'
  85. elif now.hour == 6:
  86. moment = '06'
  87. elif now.hour > 6:
  88. moment = '12'
  89. elif 2 >= now.hour >= 0:
  90. moment = '00'
  91. else:
  92. moment = '06'
  93. return date, moment
  94. def get_previous_moment(original_date, original_moment):
  95. # 将原date和moment转换为上海时区的datetime对象
  96. tz_shanghai = timezone('Asia/Shanghai')
  97. original_dt = datetime.datetime.strptime(f"{original_date}{original_moment}", "%Y%m%d%H")
  98. original_dt = tz_shanghai.localize(original_dt)
  99. # 减去6小时
  100. new_dt = original_dt - datetime.timedelta(hours=6)
  101. # 转换为原方法可接受的schedule_dt字符串格式
  102. schedule_dt_str = new_dt.strftime('%Y-%m-%d %H:%M:%S')
  103. # 调用原方法获取新的date和moment
  104. new_date, new_moment = get_moment_next(schedule_dt=schedule_dt_str)
  105. return new_date, new_moment
  106. def safe_ftp_download(ftp, remote_file_path, local_file_path, max_retries=3):
  107. file_name = os.path.basename(local_file_path)
  108. attempt = 0
  109. while attempt < max_retries:
  110. try:
  111. # 初始化下载参数
  112. ftp.pwd()
  113. ftp.sendcmd("NOOP") # 保持连接活跃
  114. ftp.voidcmd("TYPE I") # 确保二进制模式
  115. # 记录开始时间
  116. start = time.time()
  117. logger.info(f"开始第 {attempt + 1} 次下载尝试: {remote_file_path}")
  118. # 使用上下文管理器确保文件关闭
  119. with open(local_file_path, 'wb') as local_file:
  120. # 设置超时和被动模式
  121. ftp.timeout = 3000
  122. ftp.set_pasv(True)
  123. # 带进度回调的下载
  124. def _callback(data):
  125. local_file.write(data)
  126. logger.debug(f"已接收 {len(data)} 字节")
  127. ftp.retrbinary(f'RETR {remote_file_path}', _callback)
  128. # 验证文件完整性
  129. remote_size = ftp.size(remote_file_path)
  130. local_size = os.path.getsize(local_file_path)
  131. if local_size != remote_size:
  132. raise IOError(f"文件大小不匹配: 本地 {local_size} vs 远程 {remote_size}")
  133. # 记录成功日志
  134. end = time.time()
  135. now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
  136. logger_text = f"""下载成功!时间:{now.strftime('%Y-%m-%d %H:%M:%S')}
  137. 文件:{file_name}
  138. 耗时:{(end - start) / 60:.2f}分钟
  139. 平均速度:{(remote_size / 1024 / 1024) / (end - start):.2f}MB/s"""
  140. logger.info(logger_text)
  141. send_message(file_name, logger_text)
  142. return True
  143. except (error_temp, error_perm, socket_error, IOError) as e:
  144. logger.error(f"第 {attempt + 1} 次下载失败: {str(e)}")
  145. # 删除不完整文件
  146. if os.path.exists(local_file_path):
  147. try:
  148. os.remove(local_file_path)
  149. logger.warning(f"已删除不完整文件: {local_file_path}")
  150. except Exception as clean_error:
  151. logger.error(f"文件清理失败: {str(clean_error)}")
  152. attempt += 1
  153. time.sleep(5) # 重试间隔
  154. except Exception as unexpected_error:
  155. logger.critical(f"未知错误: {str(unexpected_error)}")
  156. raise
  157. logger_text = f"下载失败: 已达最大重试次数 {max_retries}"
  158. logger.error(logger_text)
  159. send_message(file_name, logger_text)
  160. return False
  161. def download_zip_files_from_ftp(moment=None):
  162. now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
  163. date = now.strftime("%Y%m%d")
  164. date_2 = (now - timedelta(days=2)).strftime("%Y%m%d")
  165. if moment is None:
  166. date, moment = get_moment_next()
  167. host = 'xxl'
  168. ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
  169. zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
  170. zip_file_path = []
  171. # 连接到FTP服务器
  172. with FTP(ftp_host) as ftp:
  173. ftp.login(user=ftp_user, passwd=ftp_password)
  174. # 切换到远程目录
  175. ftp.cwd(remote_dir)
  176. # 获取远程目录中的文件和目录列表
  177. files = ftp.nlst()
  178. # 遍历文件列表,找到ZIP文件并下载
  179. for file_name in files:
  180. if fnmatch.fnmatch(file_name, zip_extension):
  181. # start = time.time()
  182. remote_file_path = os.path.join(remote_dir, file_name)
  183. local_file_path = os.path.join(local_dir, file_name)
  184. if os.path.isfile(local_file_path):
  185. continue
  186. if safe_ftp_download(ftp, remote_file_path, local_file_path):
  187. # with open(local_file_path, 'wb') as local_file:
  188. # logger.info(f"Downloading {remote_file_path} to {local_file_path}")
  189. # ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
  190. # end = time.time()
  191. # now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
  192. # logger_text = f"下载完成时间:{now.strftime('%Y-%m-%d %H:%M:%S')},下载 {file_name} 文件,用时 {(end - start)/60:.2f}分钟"
  193. # logger.info(logger_text)
  194. # send_message(file_name, logger_text)
  195. zip_file_path.append(local_file_path)
  196. # 解压 ZIP 文件到临时目录
  197. for zip_file_p in zip_file_path:
  198. with zipfile.ZipFile(zip_file_p, 'r') as zip_ref:
  199. zip_ref.extractall(local_dir)
  200. # 删除前天之前所有 ZIP 文件
  201. delete_zip_files(date_2)
  202. def select_file_to_mongo(args, date, moment):
  203. farmId, isDq = args['farmId'], args['isDq']
  204. csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
  205. csv_file_weather = csv_file_format.replace('*', 'weather')
  206. csv_file_power = csv_file_format.replace('*', 'power')
  207. csv_weather_path, csv_power_path = False, False
  208. # 查找目标目录并读取 CSV 文件
  209. for root, dirs, files in os.walk(ftp_params['xxl']['local_dir']):
  210. if farmId in dirs:
  211. target_dir_path = os.path.join(root, farmId)
  212. for file_name in os.listdir(target_dir_path):
  213. csv_file_path = os.path.join(target_dir_path, file_name)
  214. if fnmatch.fnmatch(file_name, csv_file_weather):
  215. csv_weather_path = csv_file_path
  216. logger.info("***找到nwp:{}***".format(csv_weather_path))
  217. if fnmatch.fnmatch(file_name, csv_file_power):
  218. csv_power_path = csv_file_path
  219. logger.info("***找到power:{}***".format(csv_power_path))
  220. if csv_weather_path or csv_power_path:
  221. break
  222. if csv_weather_path is False:
  223. logger.info("获取nwp文件异常:找不到场站 {} nwp文件".format(farmId))
  224. return False
  225. # 使用 pandas 读取 CSV 文件
  226. weather = pd.read_csv(csv_weather_path)
  227. power = pd.read_csv(csv_power_path) if csv_power_path else None
  228. if isDq:
  229. if csv_weather_path and csv_power_path:
  230. power.drop(columns=['farm_id'], inplace=True)
  231. weather_power = pd.merge(weather, power, on='date_time')
  232. # 截取D0-D13时段数据
  233. df = select_dx_from_nwp(weather_power, args)
  234. insert_data_into_mongo(df, args)
  235. else:
  236. df = select_dx_from_nwp(weather, args)
  237. insert_data_into_mongo(df, args)
  238. logger.info(f"CSV 文件 {csv_file_power} 在目标目录 {farmId} 中未找到")
  239. else:
  240. if csv_weather_path:
  241. weather = select_dx_from_nwp(weather, args)
  242. # 截取D0-D13时段数据
  243. df = select_dx_from_nwp(weather, args)
  244. insert_data_into_mongo(df, args)
  245. else:
  246. logger.info(f"CSV 文件 {csv_file_weather} 在目标目录 {farmId} 中未找到")
  247. return True
  248. def select_dx_from_nwp(df, args):
  249. date, moment = get_moment_next(args.get('dt'))
  250. date = datetime.datetime.strptime(date, "%Y%m%d")
  251. date_begin = date + pd.Timedelta(days=int(args.get('day_begin', 'D0')[1:]))
  252. date_end = date + pd.Timedelta(days=int(args.get('day_end', 'D13')[1:]))
  253. df['date_time'] = df['date_time'].str.replace("_", " ")
  254. df['date_time'] = pd.to_datetime(df['date_time'])
  255. df.set_index('date_time', inplace=True)
  256. df = df.loc[date_begin.strftime('%Y-%m-%d'): date_end.strftime('%Y-%m-%d')].reset_index(drop=False)
  257. df.reset_index(drop=True, inplace=True)
  258. df['date_time'] = df['date_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
  259. return df
  260. # 示例使用
  261. ftp_params = {
  262. 'xxl' : {
  263. 'host' : '39.107.246.215',
  264. 'user' : 'jiayue',
  265. 'password' : 'JYoguf2018',
  266. 'remote_dir' : './',
  267. 'local_dir': 'data_processing/cache/data/xxl'
  268. }
  269. }
  270. def send_message(filename, text):
  271. """
  272. 1. 下载超时
  273. 2. 找不到nwp
  274. """
  275. url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=553f02de-0ef4-49ce-8d68-2489c032d42f"
  276. # 设置请求头
  277. headers = {'Content-Type': 'application/json'}
  278. text=f"""<font color=\'warning\'>算法平台解析nwp模块</font>
  279. >文件:<font color=\'comment\'>{filename}</font>\n
  280. >日志:<font color=\'comment\'> {text} \n</font> """
  281. # 设置消息内容
  282. data = {
  283. "msgtype": "markdown",
  284. "markdown": {
  285. "content": text
  286. }
  287. }
  288. # 发送POST请求
  289. response = requests.post(url, headers=headers, data=json.dumps(data))
  290. @app.route('/data_nwp_ftp', methods=['POST'])
  291. def get_nwp_from_ftp():
  292. # 获取程序开始时间
  293. start_time = time.time()
  294. result = {}
  295. success = False
  296. args = {}
  297. # print("data_nwp_ftp starts execution!")
  298. try:
  299. args = request.values.to_dict()
  300. # 1. 获取参数:日期,数据源,时刻,D0-9,场站ID,存储的 mongo 和表
  301. # print('args', args)
  302. logger.info(args)
  303. # 2. 连接FTP,从FTP服务器中获取指定参数的压缩文件(定时任务)
  304. # 3. 解压压缩文件,将其存储到mongo中
  305. date, moment = get_moment_next(args.get('dt'))
  306. logger.info("从日期{}时刻{}开始查找文件".format(date, moment))
  307. success = select_file_to_mongo(args, date, moment)
  308. new_date = date
  309. i = 1
  310. while not success and date == new_date and i <= 2:
  311. new_date, moment = get_previous_moment(date, moment)
  312. logger.info("未找到,从日期{}时刻{}开始查找文件".format(new_date, moment))
  313. success = select_file_to_mongo(args, new_date, moment)
  314. i += 1
  315. if not success:
  316. csv_file_weather = 'meteoforce_{}_{}_weather.csv'.format(args['farmId'], date + str(moment))
  317. logger_text = f"未找到 {csv_file_weather} 文件"
  318. logger.info(logger_text)
  319. send_message(csv_file_weather, logger_text)
  320. except Exception as e:
  321. my_exception = traceback.format_exc()
  322. my_exception.replace("\n", "\t")
  323. result['msg'] = my_exception
  324. logger.info("生产,获取原始nwp出错:{}".format(my_exception))
  325. end_time = time.time()
  326. result['success'] = success
  327. result['args'] = args
  328. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  329. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  330. # print("Program execution ends!")
  331. return result
  332. if __name__ == "__main__":
  333. print("Program starts execution!")
  334. from waitress import serve
  335. update_thread() #定时任务开启
  336. # 遍历参数,创建并启动线程
  337. for moment in ['00', '06', '12', '18']:
  338. threading.Thread(target=download_zip_files_from_ftp, kwargs={'moment': moment}).start()
  339. serve(app, host="0.0.0.0", port=10102)
  340. print("server start!")
  341. # now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
  342. # logger_text = f"下载完成时间:{now.strftime('%Y-%m-%d %H:%M:%S')},下载 20230323-meteoforece-nwp.zip 文件,用时 {700/60:.2f}分钟"
  343. # logger.info(logger_text)
  344. # send_message('20230323-meteoforece-nwp.zip', logger_text)
  345. # farmId, isDq = 'j01096', 1
  346. # date, moment = get_moment_next()
  347. # csv_file_weather = 'meteoforce_{}_{}_weather.csv'.format(farmId, date + str(moment))
  348. # logger_text = f"未找到 {csv_file_weather} 文件"
  349. # send_message(csv_file_weather, logger_text)