pre_data_ftp.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :pre_data_ftp.py
  4. # @Time :2024/12/26 14:18
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import yaml
  8. from ftplib import FTP
  9. import pandas as pd
  10. from flask import Flask, request, jsonify
  11. import time, datetime, os, traceback, pytz
  12. from pytz import timezone
  13. import zipfile, tempfile, shutil, paramiko
  14. from common.database_dml import get_data_from_mongo
  15. from common.logs import Log
  16. logger = Log('data-processing').logger
  17. app = Flask('pre_data_ftp——service')
  18. current_dir = os.path.dirname(os.path.abspath(__file__))
  19. with open(os.path.join(current_dir, 'ftp.yaml'), 'r', encoding='utf-8') as f:
  20. ftp_params = yaml.safe_load(f) # 只读的全局配置
  21. def get_moment_next(schedule_dt=False):
  22. if schedule_dt:
  23. now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
  24. else:
  25. now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
  26. date = now.strftime('%Y%m%d')
  27. if now.hour == 18:
  28. moment = '18'
  29. elif now.hour > 18:
  30. moment = '00'
  31. elif now.hour == 12:
  32. moment = '12'
  33. elif now.hour > 12:
  34. moment = '18'
  35. elif now.hour == 6:
  36. moment = '06'
  37. elif now.hour > 6:
  38. moment = '12'
  39. elif 2 >= now.hour >= 0:
  40. moment = '00'
  41. else:
  42. moment = '06'
  43. return date, moment
  44. def zip_temp_file(df, args, tag):
  45. def zip_folder(folder_path, zip_filePath):
  46. zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED)
  47. for root, dirs, files in os.walk(folder_path):
  48. for file in files:
  49. file_path = os.path.join(root, file)
  50. zip_file.write(file_path, os.path.relpath(file_path, folder_path))
  51. zip_file.close()
  52. temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
  53. date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
  54. modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
  55. csv_file = 'jy_{}.{}.{}_{}_{}{}_{}.csv'.format(modeler, model, version, farmId, date, moment, tag)
  56. csv_path = os.path.join(temp_dir, farmId, csv_file)
  57. os.makedirs(os.path.dirname(csv_path), exist_ok=True)
  58. df.to_csv(csv_path, index=False)
  59. zip_file = 'jy_{}.{}.{}_{}{}_{}.zip'.format(modeler, model, version, date, moment, tag)
  60. zip_path = os.path.join(tem_dir_zip, zip_file)
  61. zip_folder(temp_dir, zip_path)
  62. shutil.rmtree(temp_dir)
  63. return zip_path, zip_file
  64. def sftp_connect(hostname, port, username, password):
  65. # 创建一个SSH客户端对象
  66. ssh = paramiko.SSHClient()
  67. # 自动添加主机密钥(在生产环境中,建议验证主机密钥)
  68. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  69. try:
  70. # 连接到SSH服务器
  71. ssh.connect(hostname=hostname, port=port, username=username, password=password)
  72. logger.info(f"Connected to {hostname}:{port}")
  73. # 打开SFTP会话
  74. sftp = ssh.open_sftp()
  75. return sftp, ssh
  76. except Exception as e:
  77. logger.info(f"Failed to connect to {hostname}:{port}. Error: {e}")
  78. return None, None
  79. def sftp_close(sftp, ssh):
  80. # 关闭SFTP会话和SSH连接
  81. sftp.close()
  82. ssh.close()
  83. print("SFTP session and SSH connection closed.")
  84. def sftp_upload(sftp, local_file_path, remote_file_path):
  85. try:
  86. # 上传文件
  87. sftp.put(local_file_path, remote_file_path)
  88. logger.info(f"Uploaded {local_file_path} to {remote_file_path}")
  89. except Exception as e:
  90. logger.info(f"Failed to upload {local_file_path}. Error: {e}")
  91. def upload_sftp(zip_path, zip_file, args):
  92. ftp_host, ftp_port_dev, ftp_port_prd, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port_dev'], ftp_params['port_prd'], args['user'], ftp_params[args['user']]['password']
  93. sftp, ssh = sftp_connect(ftp_host, ftp_port_dev, ftp_user, ftp_pass)
  94. if sftp and ssh:
  95. # 上传文件
  96. sftp_upload(sftp, zip_path, '/' + ftp_params[args['user']]['modeler'] + '/'+zip_file)
  97. # 关闭连接
  98. sftp_close(sftp, ssh)
  99. shutil.rmtree(os.path.dirname(zip_path))
  100. def upload_ftp(zip_path, zip_file, args):
  101. ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password']
  102. # 创建 FTP 连接
  103. ftp = FTP()
  104. # 使用被动模式
  105. ftp.set_pasv(True)
  106. # 连接到 FTP 服务器并指定端口
  107. ftp.connect(ftp_host, ftp_port) # 使用自定义端口号
  108. # 登录到 FTP 服务器
  109. ftp.login(ftp_user, ftp_pass)
  110. # 上传文件
  111. with open(zip_path, 'rb') as f:
  112. ftp.storbinary('STOR /' + ftp_params[args['user']]['modeler'] + '/'+zip_file, f)
  113. # 退出 FTP 连接
  114. ftp.quit()
  115. shutil.rmtree(os.path.dirname(zip_path))
  116. # os.remove(zip_path)
  117. logger.info("File uploaded successfully")
  118. @app.route('/pre_data_ftp', methods=['POST'])
  119. def get_nwp_from_ftp():
  120. # 获取程序开始时间
  121. start_time = time.time()
  122. result = {}
  123. success = 0
  124. args = {}
  125. try:
  126. args = request.values.to_dict()
  127. # 1. 获取 mongo 中的预测结果
  128. logger.info(args)
  129. df = get_data_from_mongo(args)
  130. df['date_time'] = pd.to_datetime(df['date_time'])
  131. dfs = df.groupby('farm_id')
  132. model_types = {'cdq': args.get('cdq', 0), 'dq': args.get('dq', 0), 'zcq': args.get('zcq', 0)}
  133. for farm_id, df in dfs:
  134. df = df.sort_values(by='date_time')[['farm_id', 'date_time', 'power_forecast']]
  135. for tag, status in model_types.items():
  136. if int(status) == 1:
  137. # 2. 将预测结果保存成csv临时文件,命名压缩
  138. zip_path, zip_file = zip_temp_file(df, args, tag)
  139. # 3. 上传到指定的FTP服务器中
  140. upload_sftp(zip_path, zip_file, args)
  141. success = 1
  142. except Exception as e:
  143. my_exception = traceback.format_exc()
  144. my_exception.replace("\n", "\t")
  145. result['msg'] = my_exception
  146. logger.info("预测文件下发ftp出错:{}".format(my_exception))
  147. end_time = time.time()
  148. result['success'] = success
  149. result['args'] = args
  150. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  151. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  152. return result
  153. if __name__ == "__main__":
  154. print("Program starts execution!")
  155. from waitress import serve
  156. serve(app, host="0.0.0.0", port=10101)
  157. print("server start!")
  158. # args = {"user": 'anweiguo', 'model': 'Zone', 'version': 1.0, 'hour': '06',
  159. # 'farmId': 'J00645', 'mongodb_database': 'db2', 'mongodb_read_table': 'j00645_ori_res', 'day_begin':'D1',
  160. # 'day_end': 'D1'}
  161. # df = get_data_from_mongo(args)
  162. # df.rename(columns={'dateTime': 'date_time'}, inplace=True)
  163. # df['date_time'] = pd.to_datetime(df['date_time'])
  164. # zip_path, zip_file = zip_temp_file(df, args)
  165. # upload_ftp(zip_path, zip_file, args)