瀏覽代碼

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei 5 月之前
父節點
當前提交
f284d37686
共有 2 個文件被更改,包括 44 次插入3 次删除
  1. 42 2
      data_processing/data_operation/pre_data_ftp.py
  2. 2 1
      requirements.txt

+ 42 - 2
data_processing/data_operation/pre_data_ftp.py

@@ -9,7 +9,7 @@ import pandas as pd
 from flask import Flask, request, jsonify
 from flask import Flask, request, jsonify
 import time, datetime, os, traceback, pytz
 import time, datetime, os, traceback, pytz
 from pytz import timezone
 from pytz import timezone
-import zipfile, tempfile, shutil
+import zipfile, tempfile, shutil, paramiko
 from common.database_dml import get_data_from_mongo
 from common.database_dml import get_data_from_mongo
 from common.logs import Log
 from common.logs import Log
 logger = Log('data-processing').logger
 logger = Log('data-processing').logger
@@ -85,6 +85,46 @@ def zip_temp_file(df, args):
     shutil.rmtree(temp_dir)
     shutil.rmtree(temp_dir)
     return zip_path, zip_file
     return zip_path, zip_file
 
 
+def sftp_connect(hostname, port, username, password):
+    # 创建一个SSH客户端对象
+    ssh = paramiko.SSHClient()
+    # 自动添加主机密钥(在生产环境中,建议验证主机密钥)
+    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+    try:
+        # 连接到SSH服务器
+        ssh.connect(hostname=hostname, port=port, username=username, password=password)
+        logger.info(f"Connected to {hostname}:{port}")
+        # 打开SFTP会话
+        sftp = ssh.open_sftp()
+        return sftp, ssh
+    except Exception as e:
+        logger.info(f"Failed to connect to {hostname}:{port}. Error: {e}")
+        return None, None
+
+def sftp_close(sftp, ssh):
+    # 关闭SFTP会话和SSH连接
+    sftp.close()
+    ssh.close()
+    print("SFTP session and SSH connection closed.")
+
+def sftp_upload(sftp, local_file_path, remote_file_path):
+    try:
+        # 上传文件
+        sftp.put(local_file_path, remote_file_path)
+        logger.info(f"Uploaded {local_file_path} to {remote_file_path}")
+    except Exception as e:
+        logger.info(f"Failed to upload {local_file_path}. Error: {e}")
+
+def upload_sftp(zip_path, zip_file, args):
+    ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password']
+    sftp, ssh = sftp_connect(ftp_host, ftp_port, ftp_user, ftp_pass)
+    if sftp and ssh:
+        # 上传文件
+        with open(zip_path, 'rb') as f:
+            sftp_upload(sftp, f, 'STOR /' + ftp_params[args['user']]['modeler'] + '/'+zip_file)
+    # 关闭连接
+    sftp_close(sftp, ssh)
+
 def upload_ftp(zip_path, zip_file, args):
 def upload_ftp(zip_path, zip_file, args):
     ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password']
     ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password']
     # 创建 FTP 连接
     # 创建 FTP 连接
@@ -126,7 +166,7 @@ def get_nwp_from_ftp():
         # 2. 将预测结果保存成csv临时文件,命名压缩
         # 2. 将预测结果保存成csv临时文件,命名压缩
         zip_path, zip_file = zip_temp_file(df, args)
         zip_path, zip_file = zip_temp_file(df, args)
         # 3. 上传到指定的FTP服务器中
         # 3. 上传到指定的FTP服务器中
-        upload_ftp(zip_path, zip_file, args)
+        upload_sftp(zip_path, zip_file, args)
         success = 1
         success = 1
     except Exception as e:
     except Exception as e:
         my_exception = traceback.format_exc()
         my_exception = traceback.format_exc()

+ 2 - 1
requirements.txt

@@ -16,4 +16,5 @@ tensorflow==2.2.0
 matplotlib==3.5.3
 matplotlib==3.5.3
 Keras==2.3.1
 Keras==2.3.1
 protobuf==3.20.3
 protobuf==3.20.3
-APScheduler==3.10.4
+APScheduler==3.10.4
+paramiko==3.5.0