pre_data_ftp.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :pre_data_ftp.py
  4. # @Time :2024/12/26 14:18
  5. # @Author :David
  6. # @Company: shenyang JY
  7. from ftplib import FTP
  8. import pandas as pd
  9. from flask import Flask, request, jsonify
  10. import time, datetime, os, traceback, pytz
  11. from pytz import timezone
  12. import zipfile, tempfile, shutil
  13. from common.database_dml import get_data_from_mongo
  14. from common.logs import Log
  15. logger = Log('data-processing').logger
  16. app = Flask('pre_data_ftp——service')
  17. ftp_params = {
  18. 'local_dir' : './data_processing/cache/data',
  19. # 'host' : '192.168.12.20',
  20. # 'port': 32121,
  21. 'host': '192.168.1.33',
  22. 'port': 2022,
  23. 'zip_mode': 'w',
  24. 'liudawei' : {
  25. 'user' : 'liudawei',
  26. 'password' : 'liudawei@123',
  27. 'modeler' : 'koi'
  28. },
  29. 'anweiguo':{
  30. 'user' : 'anweiguo',
  31. 'password' : 'anweiguo@123',
  32. 'modeler' : 'seer'
  33. },
  34. 'wulianlong':{
  35. 'user': 'wulianlong',
  36. 'password': 'wulianlong@123',
  37. 'modeler': 'lucky'
  38. }
  39. }
  40. def get_moment_next(schedule_dt=False):
  41. if schedule_dt:
  42. now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
  43. else:
  44. now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
  45. date = now.strftime('%Y%m%d')
  46. if now.hour == 18:
  47. moment = '18'
  48. elif now.hour > 18:
  49. moment = '00'
  50. elif now.hour == 12:
  51. moment = '12'
  52. elif now.hour > 12:
  53. moment = '18'
  54. elif now.hour == 6:
  55. moment = '06'
  56. elif now.hour > 6:
  57. moment = '12'
  58. elif 2 >= now.hour >= 0:
  59. moment = '00'
  60. else:
  61. moment = '06'
  62. return date, moment
  63. def zip_temp_file(df, args):
  64. def zip_folder(folder_path, zip_filePath):
  65. zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED)
  66. for root, dirs, files in os.walk(folder_path):
  67. for file in files:
  68. file_path = os.path.join(root, file)
  69. zip_file.write(file_path, os.path.relpath(file_path, folder_path))
  70. zip_file.close()
  71. temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
  72. date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
  73. modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
  74. csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, date, moment)
  75. csv_path = os.path.join(temp_dir, farmId, csv_file)
  76. os.makedirs(os.path.dirname(csv_path), exist_ok=True)
  77. df.to_csv(csv_path, index=False)
  78. zip_file = 'jy_{}.{}.{}_{}{}_dq.zip'.format(modeler, model, version, date, moment)
  79. zip_path = os.path.join(tem_dir_zip, zip_file)
  80. zip_folder(temp_dir, zip_path)
  81. shutil.rmtree(temp_dir)
  82. return zip_path, zip_file
  83. def upload_ftp(zip_path, zip_file, args):
  84. ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password']
  85. # 创建 FTP 连接
  86. ftp = FTP()
  87. # 使用被动模式
  88. ftp.set_pasv(True)
  89. # 连接到 FTP 服务器并指定端口
  90. ftp.connect(ftp_host, ftp_port) # 使用自定义端口号
  91. # 登录到 FTP 服务器
  92. ftp.login(ftp_user, ftp_pass)
  93. # 上传文件
  94. with open(zip_path, 'rb') as f:
  95. ftp.storbinary('STOR /' + ftp_params[args['user']]['modeler'] + '/'+zip_file, f)
  96. # 退出 FTP 连接
  97. ftp.quit()
  98. shutil.rmtree(os.path.dirname(zip_path))
  99. # os.remove(zip_path)
  100. logger.info("File uploaded successfully")
  101. @app.route('/pre_data_ftp', methods=['POST'])
  102. def get_nwp_from_ftp():
  103. # 获取程序开始时间
  104. start_time = time.time()
  105. result = {}
  106. success = 0
  107. args = {}
  108. try:
  109. args = request.values.to_dict()
  110. # 1. 获取 mongo 中的预测结果
  111. logger.info(args)
  112. df = get_data_from_mongo(args)
  113. df['date_time'] = pd.to_datetime(df['date_time'])
  114. df = df.sort_values(by='date_time')[['farm_id', 'date_time', 'power_forecast']]
  115. # 2. 将预测结果保存成csv临时文件,命名压缩
  116. zip_path, zip_file = zip_temp_file(df, args)
  117. # 3. 上传到指定的FTP服务器中
  118. upload_ftp(zip_path, zip_file, args)
  119. success = 1
  120. except Exception as e:
  121. my_exception = traceback.format_exc()
  122. my_exception.replace("\n", "\t")
  123. result['msg'] = my_exception
  124. logger.info("预测文件下发ftp出错:{}".format(my_exception))
  125. end_time = time.time()
  126. result['success'] = success
  127. result['args'] = args
  128. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  129. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  130. return result
  131. if __name__ == "__main__":
  132. print("Program starts execution!")
  133. from waitress import serve
  134. serve(app, host="0.0.0.0", port=10101)
  135. print("server start!")
  136. # args = {"user": 'anweiguo', 'model': 'Zone', 'version': 1.0, 'hour': '06',
  137. # 'farmId': 'J00645', 'mongodb_database': 'db2', 'mongodb_read_table': 'j00645_ori_res', 'day_begin':'D1',
  138. # 'day_end': 'D1'}
  139. # df = get_data_from_mongo(args)
  140. # df.rename(columns={'dateTime': 'date_time'}, inplace=True)
  141. # df['date_time'] = pd.to_datetime(df['date_time'])
  142. # zip_path, zip_file = zip_temp_file(df, args)
  143. # upload_ftp(zip_path, zip_file, args)