#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :pre_data_ftp.py # @Time :2024/12/26 14:18 # @Author :David # @Company: shenyang JY from ftplib import FTP import pandas as pd from flask import Flask,request,jsonify import time, datetime, os, traceback import logging, zipfile, tempfile, shutil from common.database_dml import get_data_from_mongo app = Flask('pre_data_ftp——service') ftp_params = { 'host' : '192.168.12.20', 'port': 32121, 'zip_mode': 'w', 'liudawei' : { 'user' : 'liudawei', 'password' : 'liudawei@123', 'modeler' : 'koi' }, 'anweiguo':{ 'user' : 'anweiguo', 'password' : 'anweiguo@123', 'modeler' : 'seer' } } def zip_temp_file(df, args): def zip_folder(folder_path, zip_filePath): zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED) for root, dirs, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) zip_file.write(file_path, os.path.relpath(file_path, folder_path)) zip_file.close() temp_dir, tem_dir_zip = tempfile.mkdtemp(dir='../cache/data'), tempfile.mkdtemp(dir='../cache/data') dt = df.loc[0, 'date_time'].strftime('%Y%m%d') modeler, model, version, farmId, moment = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId'], args['moment'] csv_file = 'jy_{}.{}{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, dt, moment) csv_path = os.path.join(temp_dir, farmId, csv_file) os.makedirs(os.path.dirname(csv_path), exist_ok=True) df.to_csv(csv_path, index=False) zip_file = 'jy_{}.{}.{}_{}{}_dq.zip'.format(modeler, model, version, dt, moment) zip_path = os.path.join(tem_dir_zip, zip_file) zip_folder(temp_dir, zip_path) shutil.rmtree(temp_dir) return zip_path, zip_file def upload_ftp(zip_path, zip_file, args): ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password'] # 创建 FTP 连接 ftp = FTP() # 使用主动模式 ftp.set_pasv(False) # 连接到 FTP 服务器并指定端口 ftp.connect(ftp_host, ftp_port) # 使用自定义端口号 # 登录到 FTP 服务器 ftp.login(ftp_user, ftp_pass) # 上传文件 with open(zip_path, 'rb') as f: ftp.storbinary('STOR /' + ftp_params[args['user']]['modeler'] + '/'+zip_file, f) # 退出 FTP 连接 ftp.quit() shutil.rmtree(os.path.dirname(zip_path)) # os.remove(zip_path) print("File uploaded successfully") @app.route('/pre_data_ftp', methods=['POST']) def get_nwp_from_ftp(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 args = {} print("Program starts execution!") try: args = request.values.to_dict() # 1. 获取 mongo 中的预测结果 print('args', args) logger.info(args) df = get_data_from_mongo(args) # 2. 将预测结果保存成csv临时文件,命名压缩 zip_path, zip_file = zip_temp_file(df, args) # 3. 上传到指定的FTP服务器中 upload_ftp(zip_path, zip_file, args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n", "\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__ == "__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("pre_data_ftp") from waitress import serve serve(app, host="0.0.0.0", port=10101) print("server start!") # args = {"user": 'anweiguo', 'model': 'Zone', 'version': 1.0, 'hour': '06', # 'farmId': 'J00645', 'mongodb_database': 'db2', 'mongodb_read_table': 'J00645_tmp_w', 'day_begin':'D1', # 'day_end': 'D1'} # df = get_data_from_mongo(args) # df.rename(columns={'dateTime': 'date_time'}, inplace=True) # df['date_time'] = pd.to_datetime(df['date_time']) # zip_path, zip_file = zip_temp_file(df, args) # upload_ftp(zip_path, zip_file, args)