data_nwp_ftp.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :data_nwp_ftp.py
  4. # @Time :2024/12/26 08:38
  5. # @Author :David
  6. # @Company: shenyang JY
  7. from datetime import timedelta
  8. from ftplib import FTP
  9. import threading, glob, pytz
  10. import pandas as pd
  11. from pytz import timezone
  12. from flask import Flask,request,jsonify
  13. import time, datetime, os, traceback, re
  14. import zipfile, tempfile, shutil, fnmatch
  15. from common.database_dml import insert_data_into_mongo
  16. from apscheduler.schedulers.background import BackgroundScheduler
  17. from common.logs import Log
  18. logger = Log('data-processing').logger
  19. app = Flask('data_nwp_ftp——service')
  20. def update_thread():
  21. thread = threading.Thread(target=start_jobs)
  22. thread.start()
  23. def start_jobs():
  24. scheduler = BackgroundScheduler()
  25. scheduler.configure({'timezone': timezone("Asia/Shanghai")})
  26. scheduler.add_job(func=download_zip_files_from_ftp, trigger="interval", seconds=300)
  27. scheduler.start()
  28. def match_date(date, filename):
  29. given_date = datetime.datetime.strptime(date, '%Y%m%d')
  30. date_pattern = re.compile(r'(\d{8})')
  31. match = date_pattern.search(filename)
  32. if match:
  33. filename_str = match.group(0)
  34. filename_date = datetime.datetime.strptime(filename_str, '%Y%m%d')
  35. if filename_date <= given_date:
  36. return True
  37. def delete_zip_files(date):
  38. xxl_path = ftp_params['xxl']['local_dir']
  39. # 遍历文件夹中的所有文件
  40. for root, dirs, files in os.walk(xxl_path):
  41. for filename in files:
  42. # 检查文件名是否以 'meteo_date_' 开头且以 '.zip' 结尾
  43. if fnmatch.fnmatch(filename, f'*.zip') and match_date(date, filename):
  44. # 构建文件的完整路径
  45. file_path = os.path.join(xxl_path, filename)
  46. # 删除文件
  47. try:
  48. os.remove(file_path)
  49. print(f"Deleted file: {file_path}")
  50. except OSError as e:
  51. print(f"Error deleting file {file_path}: {e.strerror}")
  52. for farmId in dirs:
  53. target_dir_path = os.path.join(root, farmId)
  54. for file_name in os.listdir(target_dir_path):
  55. csv_file_path = os.path.join(target_dir_path, file_name)
  56. if fnmatch.fnmatch(csv_file_path, f'*.csv') and match_date(date, file_name):
  57. try:
  58. os.remove(csv_file_path)
  59. print(f"Deleted file: {csv_file_path}")
  60. except OSError as e:
  61. print(f"Error deleting file {csv_file_path}: {e.strerror}")
  62. def download_zip_files_from_ftp(moment=None):
  63. now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
  64. date = now.strftime("%Y%m%d")
  65. date_2 = (now - timedelta(days=2)).strftime("%Y%m%d")
  66. if moment is None:
  67. if now.hour >= 18:
  68. moment = '18'
  69. elif now.hour >= 12:
  70. moment = '12'
  71. elif now.hour >= 6:
  72. moment = '06'
  73. else:
  74. moment = '00'
  75. host = 'xxl'
  76. ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
  77. zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
  78. zip_file_path = []
  79. # 连接到FTP服务器
  80. with FTP(ftp_host) as ftp:
  81. ftp.login(user=ftp_user, passwd=ftp_password)
  82. # 切换到远程目录
  83. ftp.cwd(remote_dir)
  84. # 获取远程目录中的文件和目录列表
  85. files = ftp.nlst()
  86. # 遍历文件列表,找到ZIP文件并下载
  87. for file_name in files:
  88. if fnmatch.fnmatch(file_name, zip_extension):
  89. remote_file_path = os.path.join(remote_dir, file_name)
  90. local_file_path = os.path.join(local_dir, file_name)
  91. with open(local_file_path, 'wb') as local_file:
  92. logger.info(f"Downloading {remote_file_path} to {local_file_path}")
  93. ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
  94. logger.info(f"Downloaded {file_name}")
  95. zip_file_path.append(local_file_path)
  96. # 解压 ZIP 文件到临时目录
  97. for zip_file_p in zip_file_path:
  98. with zipfile.ZipFile(zip_file_p, 'r') as zip_ref:
  99. zip_ref.extractall(local_dir)
  100. # 删除前天之前所有 ZIP 文件
  101. delete_zip_files(date_2)
  102. def select_file_to_mongo(args):
  103. date, moment, farmId, isDq = args['date'], args['moment'], args['farmId'], args['isDq']
  104. date = datetime.datetime.strptime(args['date'], '%Y-%m-%d 00:00:00').strftime("%Y%m%d")
  105. csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
  106. csv_file_weather = csv_file_format.replace('*', 'weather')
  107. csv_file_power = csv_file_format.replace('*', 'power')
  108. csv_weather_path, csv_power_path = False, False
  109. # 查找目标目录并读取 CSV 文件
  110. for root, dirs, files in os.walk(ftp_params['xxl']['local_dir']):
  111. if farmId in dirs:
  112. target_dir_path = os.path.join(root, farmId)
  113. for file_name in os.listdir(target_dir_path):
  114. csv_file_path = os.path.join(target_dir_path, file_name)
  115. if fnmatch.fnmatch(file_name, csv_file_weather):
  116. csv_weather_path = csv_file_path
  117. if fnmatch.fnmatch(file_name, csv_file_power):
  118. csv_power_path = csv_file_path
  119. if csv_weather_path or csv_power_path:
  120. break
  121. # 使用 pandas 读取 CSV 文件
  122. weather = pd.read_csv(csv_weather_path) if csv_power_path else None
  123. power = pd.read_csv(csv_power_path) if csv_power_path else None
  124. if isDq:
  125. if csv_weather_path and csv_power_path:
  126. power.drop(columns=['farm_id'])
  127. weather_power = pd.merge(weather, power, on='date_time')
  128. # 截取D0-D13时段数据
  129. df = select_dx_from_nwp(weather_power, args)
  130. insert_data_into_mongo(df, args)
  131. else:
  132. logger.info(f"CSV 文件 {csv_file_power} 或 {csv_file_weather} 在目标目录 {farmId} 中未找到")
  133. else:
  134. if csv_weather_path:
  135. weather = select_dx_from_nwp(weather, args)
  136. # 截取D0-D13时段数据
  137. df = select_dx_from_nwp(weather, args)
  138. insert_data_into_mongo(df, args)
  139. else:
  140. logger.info(f"CSV 文件 {csv_file_weather} 在目标目录 {farmId} 中未找到")
  141. def select_dx_from_nwp(df, args):
  142. date = datetime.datetime.strptime(args['date'], '%Y-%m-%d 00:00:00')
  143. date_begin = date + pd.Timedelta(days=int(args.get('day_begin', 'D0')[1:]))
  144. date_end = date + pd.Timedelta(days=int(args.get('day_end', 'D13')[1:]))
  145. df['date_time'] = df['date_time'].str.replace("_", " ")
  146. df['date_time'] = pd.to_datetime(df['date_time'])
  147. df.set_index('date_time', inplace=True)
  148. df = df.loc[date_begin.strftime('%Y-%m-%d'): date_end.strftime('%Y-%m-%d')].reset_index(drop=False)
  149. df.reset_index(drop=True, inplace=True)
  150. df['date_time'] = df['date_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
  151. return df
  152. # 示例使用
  153. ftp_params = {
  154. 'xxl' : {
  155. 'host' : '39.107.246.215',
  156. 'user' : 'jiayue',
  157. 'password' : 'JYoguf2018',
  158. 'remote_dir' : './',
  159. 'local_dir': './data_processing/cache/data/xxl'
  160. }
  161. }
  162. @app.route('/data_nwp_ftp', methods=['POST'])
  163. def get_nwp_from_ftp():
  164. # 获取程序开始时间
  165. start_time = time.time()
  166. result = {}
  167. success = 0
  168. args = {}
  169. # print("data_nwp_ftp starts execution!")
  170. try:
  171. args = request.values.to_dict()
  172. # 1. 获取参数:日期,数据源,时刻,D0-9,场站ID,存储的 mongo 和表
  173. # print('args', args)
  174. logger.info(args)
  175. # 2. 连接FTP,从FTP服务器中获取指定参数的压缩文件(定时任务)
  176. # 3. 解压压缩文件,将其存储到mongo中
  177. select_file_to_mongo(args)
  178. success = 1
  179. except Exception as e:
  180. my_exception = traceback.format_exc()
  181. my_exception.replace("\n", "\t")
  182. result['msg'] = my_exception
  183. logger.info("生产,获取原始nwp出错:{}".format(my_exception))
  184. end_time = time.time()
  185. result['success'] = success
  186. result['args'] = args
  187. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  188. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  189. # print("Program execution ends!")
  190. return result
  191. if __name__ == "__main__":
  192. print("Program starts execution!")
  193. from waitress import serve
  194. update_thread() #定时任务开启
  195. # current_time = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
  196. # current_hour = current_time.hour
  197. # if current_hour >= 18:
  198. # current_hour = '18'
  199. # elif current_hour >= 12:
  200. # current_hour = '12'
  201. # elif current_hour >= 6:
  202. # current_hour = '06'
  203. # else:
  204. # current_hour = '00'
  205. current_hour = '06' # 默认首次运行下载06时刻的zip
  206. threading.Thread(target=download_zip_files_from_ftp, kwargs={'moment': current_hour}).start()
  207. serve(app, host="0.0.0.0", port=10102)
  208. print("server start!")
  209. # args = {"source": 'xxl', "date": '2024-12-27 00:00:00', 'moment': '06', 'farmId': 'J00645',
  210. # 'mongodb_database': 'db2', 'mongodb_write_table': 'j00645-d1', 'day_begin':'D0',
  211. # 'day_end': 'D13', 'isDq': True}
  212. # download_zip_files_from_ftp(hour='06')
  213. # select_file_to_mongo(args)
  214. # delete_zip_files('20241225')