Browse Source

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei 3 months ago
parent
commit
c1b9a21c60
1 changed files with 25 additions and 9 deletions
  1. 25 9
      data_processing/data_operation/pre_data_ftp.py

+ 25 - 9
data_processing/data_operation/pre_data_ftp.py

@@ -20,7 +20,8 @@ ftp_params = {
     # 'host' : '192.168.12.20',
     # 'port': 32121,
     'host': '192.168.1.33',
-    'port': 2022,
+    'port_dev': 2022,
+    'port_prd': 2025,
     'zip_mode': 'w',
     'liudawei' : {
         'user' : 'liudawei',
@@ -114,14 +115,26 @@ def sftp_upload(sftp, local_file_path, remote_file_path):
     except Exception as e:
         logger.info(f"Failed to upload {local_file_path}. Error: {e}")
 
-def upload_sftp(zip_path, zip_file, args):
-    ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password']
-    sftp, ssh = sftp_connect(ftp_host, ftp_port, ftp_user, ftp_pass)
+def upload_sftp(zip_path, zip_file, args, cdq, dq, zq):
+    ftp_host, ftp_port_dev, ftp_port_prd, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port_dev'], ftp_params['port_prd'], args['user'], ftp_params[args['user']]['password']
+    sftp, ssh = sftp_connect(ftp_host, ftp_port_dev, ftp_user, ftp_pass)
     if sftp and ssh:
         # 上传文件
         sftp_upload(sftp, zip_path, '/' + ftp_params[args['user']]['modeler'] + '/'+zip_file)
     # 关闭连接
     sftp_close(sftp, ssh)
+    if cdq:
+        sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
+        sftp_upload(sftp, zip_path, '/cdq' + '/'+zip_file)
+        sftp_close(sftp, ssh)
+    if dq:
+        sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
+        sftp_upload(sftp, zip_path, '/dq' + '/' + zip_file)
+        sftp_close(sftp, ssh)
+    if zq:
+        sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
+        sftp_upload(sftp, zip_path, '/zq' + '/' + zip_file)
+        sftp_close(sftp, ssh)
     shutil.rmtree(os.path.dirname(zip_path))
 
 def upload_ftp(zip_path, zip_file, args):
@@ -161,11 +174,14 @@ def get_nwp_from_ftp():
         logger.info(args)
         df = get_data_from_mongo(args)
         df['date_time'] = pd.to_datetime(df['date_time'])
-        df = df.sort_values(by='date_time')[['farm_id', 'date_time', 'power_forecast']]
-        # 2. 将预测结果保存成csv临时文件,命名压缩
-        zip_path, zip_file = zip_temp_file(df, args)
-        # 3. 上传到指定的FTP服务器中
-        upload_sftp(zip_path, zip_file, args)
+        dfs = df.groupby('farm_id')
+        for farm_id, df in dfs:
+            cdq, dq, zq = df.loc[df.index[0], ['cdq', 'dq', 'zq']]
+            df = df.sort_values(by='date_time')[['farm_id', 'date_time', 'power_forecast']]
+            # 2. 将预测结果保存成csv临时文件,命名压缩
+            zip_path, zip_file = zip_temp_file(df, args)
+            # 3. 上传到指定的FTP服务器中
+            upload_sftp(zip_path, zip_file, args, cdq, dq, zq)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()