123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :pre_data_ftp.py
- # @Time :2024/12/26 14:18
- # @Author :David
- # @Company: shenyang JY
- from ftplib import FTP
- import pandas as pd
- from flask import Flask, request, jsonify
- import time, datetime, os, traceback, pytz
- from pytz import timezone
- import zipfile, tempfile, shutil, paramiko
- from common.database_dml import get_data_from_mongo
- from common.logs import Log
- logger = Log('data-processing').logger
- app = Flask('pre_data_ftp——service')
- ftp_params = {
- 'local_dir' : './data_processing/cache/data',
- # 'host' : '192.168.12.20',
- # 'port': 32121,
- 'host': '192.168.1.33',
- 'port_dev': 2022,
- 'port_prd': 2025,
- 'zip_mode': 'w',
- 'liudawei' : {
- 'user' : 'liudawei',
- 'password' : 'liudawei@123',
- 'modeler' : 'koi'
- },
- 'anweiguo':{
- 'user' : 'anweiguo',
- 'password' : 'anweiguo@123',
- 'modeler' : 'seer'
- },
- 'wulianlong':{
- 'user': 'wulianlong',
- 'password': 'wulianlong@123',
- 'modeler': 'lucky'
- }
- }
- def get_moment_next(schedule_dt=False):
- if schedule_dt:
- now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
- else:
- now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
- date = now.strftime('%Y%m%d')
- if now.hour == 18:
- moment = '18'
- elif now.hour > 18:
- moment = '00'
- elif now.hour == 12:
- moment = '12'
- elif now.hour > 12:
- moment = '18'
- elif now.hour == 6:
- moment = '06'
- elif now.hour > 6:
- moment = '12'
- elif 2 >= now.hour >= 0:
- moment = '00'
- else:
- moment = '06'
- return date, moment
- def zip_temp_file(df, args):
- def zip_folder(folder_path, zip_filePath):
- zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED)
- for root, dirs, files in os.walk(folder_path):
- for file in files:
- file_path = os.path.join(root, file)
- zip_file.write(file_path, os.path.relpath(file_path, folder_path))
- zip_file.close()
- temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
- date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
- modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
- csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, date, moment)
- csv_path = os.path.join(temp_dir, farmId, csv_file)
- os.makedirs(os.path.dirname(csv_path), exist_ok=True)
- df.to_csv(csv_path, index=False)
- zip_file = 'jy_{}.{}.{}_{}{}_dq.zip'.format(modeler, model, version, date, moment)
- zip_path = os.path.join(tem_dir_zip, zip_file)
- zip_folder(temp_dir, zip_path)
- shutil.rmtree(temp_dir)
- return zip_path, zip_file
- def sftp_connect(hostname, port, username, password):
- # 创建一个SSH客户端对象
- ssh = paramiko.SSHClient()
- # 自动添加主机密钥(在生产环境中,建议验证主机密钥)
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- try:
- # 连接到SSH服务器
- ssh.connect(hostname=hostname, port=port, username=username, password=password)
- logger.info(f"Connected to {hostname}:{port}")
- # 打开SFTP会话
- sftp = ssh.open_sftp()
- return sftp, ssh
- except Exception as e:
- logger.info(f"Failed to connect to {hostname}:{port}. Error: {e}")
- return None, None
- def sftp_close(sftp, ssh):
- # 关闭SFTP会话和SSH连接
- sftp.close()
- ssh.close()
- print("SFTP session and SSH connection closed.")
- def sftp_upload(sftp, local_file_path, remote_file_path):
- try:
- # 上传文件
- sftp.put(local_file_path, remote_file_path)
- logger.info(f"Uploaded {local_file_path} to {remote_file_path}")
- except Exception as e:
- logger.info(f"Failed to upload {local_file_path}. Error: {e}")
- def upload_sftp(zip_path, zip_file, args, cdq, dq, zq):
- ftp_host, ftp_port_dev, ftp_port_prd, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port_dev'], ftp_params['port_prd'], args['user'], ftp_params[args['user']]['password']
- sftp, ssh = sftp_connect(ftp_host, ftp_port_dev, ftp_user, ftp_pass)
- if sftp and ssh:
- # 上传文件
- sftp_upload(sftp, zip_path, '/' + ftp_params[args['user']]['modeler'] + '/'+zip_file)
- # 关闭连接
- sftp_close(sftp, ssh)
- if cdq:
- sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
- sftp_upload(sftp, zip_path, '/cdq' + '/'+zip_file)
- sftp_close(sftp, ssh)
- if dq:
- sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
- sftp_upload(sftp, zip_path, '/dq' + '/' + zip_file)
- sftp_close(sftp, ssh)
- if zq:
- sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
- sftp_upload(sftp, zip_path, '/zq' + '/' + zip_file)
- sftp_close(sftp, ssh)
- shutil.rmtree(os.path.dirname(zip_path))
- def upload_ftp(zip_path, zip_file, args):
- ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password']
- # 创建 FTP 连接
- ftp = FTP()
- # 使用被动模式
- ftp.set_pasv(True)
- # 连接到 FTP 服务器并指定端口
- ftp.connect(ftp_host, ftp_port) # 使用自定义端口号
- # 登录到 FTP 服务器
- ftp.login(ftp_user, ftp_pass)
- # 上传文件
- with open(zip_path, 'rb') as f:
- ftp.storbinary('STOR /' + ftp_params[args['user']]['modeler'] + '/'+zip_file, f)
- # 退出 FTP 连接
- ftp.quit()
- shutil.rmtree(os.path.dirname(zip_path))
- # os.remove(zip_path)
- logger.info("File uploaded successfully")
- @app.route('/pre_data_ftp', methods=['POST'])
- def get_nwp_from_ftp():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- args = {}
- try:
- args = request.values.to_dict()
- # 1. 获取 mongo 中的预测结果
- logger.info(args)
- df = get_data_from_mongo(args)
- df['date_time'] = pd.to_datetime(df['date_time'])
- dfs = df.groupby('farm_id')
- for farm_id, df in dfs:
- if {'cdq', 'dq', 'zq'}.issubset(set(df.columns.tolist())):
- cdq, dq, zq = df.loc[df.index[0], ['cdq', 'dq', 'zq']]
- else:
- cdq, dq, zq = 0, 0, 0
- df = df.sort_values(by='date_time')[['farm_id', 'date_time', 'power_forecast']]
- # 2. 将预测结果保存成csv临时文件,命名压缩
- zip_path, zip_file = zip_temp_file(df, args)
- # 3. 上传到指定的FTP服务器中
- upload_sftp(zip_path, zip_file, args, cdq, dq, zq)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n", "\t")
- result['msg'] = my_exception
- logger.info("预测文件下发ftp出错:{}".format(my_exception))
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- return result
- if __name__ == "__main__":
- print("Program starts execution!")
- from waitress import serve
- serve(app, host="0.0.0.0", port=10101)
- print("server start!")
- # args = {"user": 'anweiguo', 'model': 'Zone', 'version': 1.0, 'hour': '06',
- # 'farmId': 'J00645', 'mongodb_database': 'db2', 'mongodb_read_table': 'j00645_ori_res', 'day_begin':'D1',
- # 'day_end': 'D1'}
- # df = get_data_from_mongo(args)
- # df.rename(columns={'dateTime': 'date_time'}, inplace=True)
- # df['date_time'] = pd.to_datetime(df['date_time'])
- # zip_path, zip_file = zip_temp_file(df, args)
- # upload_ftp(zip_path, zip_file, args)
|