Преглед изворни кода

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei пре 1 месец
родитељ
комит
ac6d7325f4

+ 10 - 7
data_processing/data_operation/pre_data_ftp.py

@@ -64,7 +64,7 @@ def get_moment_next(schedule_dt=False):
         moment = '06'
         moment = '06'
     return date, moment
     return date, moment
 
 
-def zip_temp_file(df, args):
+def zip_temp_file(df, args, tag):
     def zip_folder(folder_path, zip_filePath):
     def zip_folder(folder_path, zip_filePath):
         zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED)
         zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED)
         for root, dirs, files in os.walk(folder_path):
         for root, dirs, files in os.walk(folder_path):
@@ -75,11 +75,11 @@ def zip_temp_file(df, args):
     temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
     temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
     date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
     date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
     modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
     modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
-    csv_file = 'jy_{}.{}.{}_{}_{}{}.csv'.format(modeler, model, version, farmId, date, moment)
+    csv_file = 'jy_{}.{}.{}_{}_{}{}_{}.csv'.format(modeler, model, version, farmId, date, moment, tag)
     csv_path = os.path.join(temp_dir, farmId, csv_file)
     csv_path = os.path.join(temp_dir, farmId, csv_file)
     os.makedirs(os.path.dirname(csv_path), exist_ok=True)
     os.makedirs(os.path.dirname(csv_path), exist_ok=True)
     df.to_csv(csv_path, index=False)
     df.to_csv(csv_path, index=False)
-    zip_file = 'jy_{}.{}.{}_{}{}.zip'.format(modeler, model, version, date, moment)
+    zip_file = 'jy_{}.{}.{}_{}{}_{}.zip'.format(modeler, model, version, date, moment, tag)
     zip_path = os.path.join(tem_dir_zip, zip_file)
     zip_path = os.path.join(tem_dir_zip, zip_file)
     zip_folder(temp_dir, zip_path)
     zip_folder(temp_dir, zip_path)
     shutil.rmtree(temp_dir)
     shutil.rmtree(temp_dir)
@@ -163,12 +163,15 @@ def get_nwp_from_ftp():
         df = get_data_from_mongo(args)
         df = get_data_from_mongo(args)
         df['date_time'] = pd.to_datetime(df['date_time'])
         df['date_time'] = pd.to_datetime(df['date_time'])
         dfs = df.groupby('farm_id')
         dfs = df.groupby('farm_id')
+        model_types = {'cdq': args.get('cdq', 0), 'dq': args.get('dq', 0), 'zcq': args.get('zcq', 0)}
         for farm_id, df in dfs:
         for farm_id, df in dfs:
             df = df.sort_values(by='date_time')[['farm_id', 'date_time', 'power_forecast']]
             df = df.sort_values(by='date_time')[['farm_id', 'date_time', 'power_forecast']]
-            # 2. 将预测结果保存成csv临时文件,命名压缩
-            zip_path, zip_file = zip_temp_file(df, args)
-            # 3. 上传到指定的FTP服务器中
-            upload_sftp(zip_path, zip_file, args)
+            for tag, status in model_types.items():
+                if int(status) == 1:
+                    # 2. 将预测结果保存成csv临时文件,命名压缩
+                    zip_path, zip_file = zip_temp_file(df, args, tag)
+                    # 3. 上传到指定的FTP服务器中
+                    upload_sftp(zip_path, zip_file, args)
         success = 1
         success = 1
     except Exception as e:
     except Exception as e:
         my_exception = traceback.format_exc()
         my_exception = traceback.format_exc()

+ 4 - 4
data_processing/data_operation/pre_prod_ftp.py

@@ -142,7 +142,7 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
                 continue
                 continue
 
 
             for model_version in mappings[engineer]:
             for model_version in mappings[engineer]:
-                target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}.zip"
+                target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_{model_type}.zip"
                 remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
                 remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
                 local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
                 local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
 
 
@@ -209,11 +209,11 @@ def upload_to_sftp(local_path: str, target_dir: str) -> bool:
             transport.close()
             transport.close()
 
 
 
 
-def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
+def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir, model_type):
     """处理所有下载的ZIP文件并收集场站目录"""
     """处理所有下载的ZIP文件并收集场站目录"""
     for engineer in mappings:
     for engineer in mappings:
         for model_version in mappings[engineer]:
         for model_version in mappings[engineer]:
-            target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
+            target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_{model_type}.zip"
             zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
             zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
             station_codes = mappings[engineer][model_version]
             station_codes = mappings[engineer][model_version]
 
 
@@ -300,7 +300,7 @@ def prod_data_handler(mappings, model_type):
         if download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
         if download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
 
 
             # 处理下载的文件
             # 处理下载的文件
-            process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
+            process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir, model_type)
 
 
             # 创建最终ZIP
             # 创建最终ZIP
             zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
             zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)