#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :pre_data_ftp.py # @Time :2024/12/26 14:18 # @Author :David # @Company: shenyang JY from ftplib import FTP import pandas as pd from flask import Flask, request, jsonify import time, datetime, os, traceback, pytz from pytz import timezone import logging, zipfile, tempfile, shutil from common.database_dml import get_data_from_mongo app = Flask('pre_data_ftp——service') ftp_params = { 'local_dir' : './data_processing/cache/data', 'host' : '192.168.12.20', 'port': 32121, 'zip_mode': 'w', 'liudawei' : { 'user' : 'liudawei', 'password' : 'liudawei@123', 'modeler' : 'koi' }, 'anweiguo':{ 'user' : 'anweiguo', 'password' : 'anweiguo@123', 'modeler' : 'seer' } } def zip_temp_file(df, args): def zip_folder(folder_path, zip_filePath): zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED) for root, dirs, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) zip_file.write(file_path, os.path.relpath(file_path, folder_path)) zip_file.close() temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir']) current_time = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai")) dt = current_time.strftime('%Y%m%d') modeler, model, version, farmId, moment = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId'], args['moment'] csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, dt, moment) csv_path = os.path.join(temp_dir, farmId, csv_file) os.makedirs(os.path.dirname(csv_path), exist_ok=True) df.to_csv(csv_path, index=False) zip_file = 'jy_{}.{}.{}_{}{}_dq.zip'.format(modeler, model, version, dt, moment) zip_path = os.path.join(tem_dir_zip, zip_file) zip_folder(temp_dir, zip_path) shutil.rmtree(temp_dir) return zip_path, zip_file def upload_ftp(zip_path, zip_file, args): ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password'] # 创建 FTP 连接 ftp = FTP() # 使用主动模式 ftp.set_pasv(False) # 连接到 FTP 服务器并指定端口 ftp.connect(ftp_host, ftp_port) # 使用自定义端口号 # 登录到 FTP 服务器 ftp.login(ftp_user, ftp_pass) # 上传文件 with open(zip_path, 'rb') as f: ftp.storbinary('STOR /' + ftp_params[args['user']]['modeler'] + '/'+zip_file, f) # 退出 FTP 连接 ftp.quit() shutil.rmtree(os.path.dirname(zip_path)) # os.remove(zip_path) print("File uploaded successfully") @app.route('/pre_data_ftp', methods=['POST']) def get_nwp_from_ftp(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 args = {} print("Program starts execution!") try: args = request.values.to_dict() # 1. 获取 mongo 中的预测结果 print('args', args) logger.info(args) df = get_data_from_mongo(args) df['date_time'] = pd.to_datetime(df['date_time']) df = df.sort_values(by='date_time') # 2. 将预测结果保存成csv临时文件,命名压缩 zip_path, zip_file = zip_temp_file(df, args) # 3. 上传到指定的FTP服务器中 upload_ftp(zip_path, zip_file, args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n", "\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__ == "__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("pre_data_ftp") from waitress import serve serve(app, host="0.0.0.0", port=10101) print("server start!") # args = {"user": 'anweiguo', 'model': 'Zone', 'version': 1.0, 'hour': '06', # 'farmId': 'J00645', 'mongodb_database': 'db2', 'mongodb_read_table': 'J00645_tmp_w', 'day_begin':'D1', # 'day_end': 'D1'} # df = get_data_from_mongo(args) # df.rename(columns={'dateTime': 'date_time'}, inplace=True) # df['date_time'] = pd.to_datetime(df['date_time']) # zip_path, zip_file = zip_temp_file(df, args) # upload_ftp(zip_path, zip_file, args)