123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :pre_data_ftp.py
- # @Time :2024/12/26 14:18
- # @Author :David
- # @Company: shenyang JY
- from ftplib import FTP
- import pandas as pd
- from flask import Flask, request, jsonify
- import time, datetime, os, traceback, pytz
- from pytz import timezone
- import logging, zipfile, tempfile, shutil
- from common.database_dml import get_data_from_mongo
- app = Flask('pre_data_ftp——service')
- ftp_params = {
- 'local_dir' : './data_processing/cache/data',
- 'host' : '192.168.12.20',
- 'port': 32121,
- 'zip_mode': 'w',
- 'liudawei' : {
- 'user' : 'liudawei',
- 'password' : 'liudawei@123',
- 'modeler' : 'koi'
- },
- 'anweiguo':{
- 'user' : 'anweiguo',
- 'password' : 'anweiguo@123',
- 'modeler' : 'seer'
- }
- }
- def zip_temp_file(df, args):
- def zip_folder(folder_path, zip_filePath):
- zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED)
- for root, dirs, files in os.walk(folder_path):
- for file in files:
- file_path = os.path.join(root, file)
- zip_file.write(file_path, os.path.relpath(file_path, folder_path))
- zip_file.close()
- temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
- current_time = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
- dt = current_time.strftime('%Y%m%d')
- modeler, model, version, farmId, moment = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId'], args['moment']
- csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, dt, moment)
- csv_path = os.path.join(temp_dir, farmId, csv_file)
- os.makedirs(os.path.dirname(csv_path), exist_ok=True)
- df.to_csv(csv_path, index=False)
- zip_file = 'jy_{}.{}.{}_{}{}_dq.zip'.format(modeler, model, version, dt, moment)
- zip_path = os.path.join(tem_dir_zip, zip_file)
- zip_folder(temp_dir, zip_path)
- shutil.rmtree(temp_dir)
- return zip_path, zip_file
- def upload_ftp(zip_path, zip_file, args):
- ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password']
- # 创建 FTP 连接
- ftp = FTP()
- # 使用主动模式
- ftp.set_pasv(False)
- # 连接到 FTP 服务器并指定端口
- ftp.connect(ftp_host, ftp_port) # 使用自定义端口号
- # 登录到 FTP 服务器
- ftp.login(ftp_user, ftp_pass)
- # 上传文件
- with open(zip_path, 'rb') as f:
- ftp.storbinary('STOR /' + ftp_params[args['user']]['modeler'] + '/'+zip_file, f)
- # 退出 FTP 连接
- ftp.quit()
- shutil.rmtree(os.path.dirname(zip_path))
- # os.remove(zip_path)
- print("File uploaded successfully")
- @app.route('/pre_data_ftp', methods=['POST'])
- def get_nwp_from_ftp():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- args = {}
- print("Program starts execution!")
- try:
- args = request.values.to_dict()
- # 1. 获取 mongo 中的预测结果
- print('args', args)
- logger.info(args)
- df = get_data_from_mongo(args)
- df['date_time'] = pd.to_datetime(df['date_time'])
- df = df.sort_values(by='date_time')
- # 2. 将预测结果保存成csv临时文件,命名压缩
- zip_path, zip_file = zip_temp_file(df, args)
- # 3. 上传到指定的FTP服务器中
- upload_ftp(zip_path, zip_file, args)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n", "\t")
- result['msg'] = my_exception
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
- if __name__ == "__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("pre_data_ftp")
- from waitress import serve
- serve(app, host="0.0.0.0", port=10101)
- print("server start!")
- # args = {"user": 'anweiguo', 'model': 'Zone', 'version': 1.0, 'hour': '06',
- # 'farmId': 'J00645', 'mongodb_database': 'db2', 'mongodb_read_table': 'J00645_tmp_w', 'day_begin':'D1',
- # 'day_end': 'D1'}
- # df = get_data_from_mongo(args)
- # df.rename(columns={'dateTime': 'date_time'}, inplace=True)
- # df['date_time'] = pd.to_datetime(df['date_time'])
- # zip_path, zip_file = zip_temp_file(df, args)
- # upload_ftp(zip_path, zip_file, args)
|