Browse Source

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei 4 months ago
parent
commit
3a46d5effd

+ 3 - 1
data_processing/data_operation/data_nwp_ftp.py

@@ -138,12 +138,14 @@ def select_file_to_mongo(args):
     power = pd.read_csv(csv_power_path) if csv_power_path else None
     power = pd.read_csv(csv_power_path) if csv_power_path else None
     if isDq:
     if isDq:
         if csv_weather_path and csv_power_path:
         if csv_weather_path and csv_power_path:
-            power.drop(columns=['farm_id'])
+            power.drop(columns=['farm_id'], inplace=True)
             weather_power = pd.merge(weather, power, on='date_time')
             weather_power = pd.merge(weather, power, on='date_time')
             # 截取D0-D13时段数据
             # 截取D0-D13时段数据
             df = select_dx_from_nwp(weather_power, args)
             df = select_dx_from_nwp(weather_power, args)
             insert_data_into_mongo(df, args)
             insert_data_into_mongo(df, args)
         else:
         else:
+            df = select_dx_from_nwp(weather, args)
+            insert_data_into_mongo(df, args)
             logger.info(f"CSV 文件 {csv_file_power} 或 {csv_file_weather} 在目标目录 {farmId} 中未找到")
             logger.info(f"CSV 文件 {csv_file_power} 或 {csv_file_weather} 在目标目录 {farmId} 中未找到")
     else:
     else:
         if csv_weather_path:
         if csv_weather_path:

+ 8 - 10
data_processing/data_operation/pre_data_ftp.py

@@ -9,8 +9,10 @@ import pandas as pd
 from flask import Flask, request, jsonify
 from flask import Flask, request, jsonify
 import time, datetime, os, traceback, pytz
 import time, datetime, os, traceback, pytz
 from pytz import timezone
 from pytz import timezone
-import logging, zipfile, tempfile, shutil
+import zipfile, tempfile, shutil
 from common.database_dml import get_data_from_mongo
 from common.database_dml import get_data_from_mongo
+from common.logs import Log
+logger = Log('data-processing').logger
 
 
 app = Flask('pre_data_ftp——service')
 app = Flask('pre_data_ftp——service')
 
 
@@ -58,8 +60,8 @@ def upload_ftp(zip_path, zip_file, args):
     # 创建 FTP 连接
     # 创建 FTP 连接
     ftp = FTP()
     ftp = FTP()
 
 
-    # 使用动模式
-    ftp.set_pasv(False)
+    # 使用动模式
+    ftp.set_pasv(True)
 
 
     # 连接到 FTP 服务器并指定端口
     # 连接到 FTP 服务器并指定端口
     ftp.connect(ftp_host, ftp_port)  # 使用自定义端口号
     ftp.connect(ftp_host, ftp_port)  # 使用自定义端口号
@@ -75,7 +77,7 @@ def upload_ftp(zip_path, zip_file, args):
     ftp.quit()
     ftp.quit()
     shutil.rmtree(os.path.dirname(zip_path))
     shutil.rmtree(os.path.dirname(zip_path))
     # os.remove(zip_path)
     # os.remove(zip_path)
-    print("File uploaded successfully")
+    logger.info("File uploaded successfully")
 
 
 @app.route('/pre_data_ftp', methods=['POST'])
 @app.route('/pre_data_ftp', methods=['POST'])
 def get_nwp_from_ftp():
 def get_nwp_from_ftp():
@@ -84,11 +86,9 @@ def get_nwp_from_ftp():
     result = {}
     result = {}
     success = 0
     success = 0
     args = {}
     args = {}
-    print("Program starts execution!")
     try:
     try:
         args = request.values.to_dict()
         args = request.values.to_dict()
         # 1. 获取 mongo 中的预测结果
         # 1. 获取 mongo 中的预测结果
-        print('args', args)
         logger.info(args)
         logger.info(args)
         df = get_data_from_mongo(args)
         df = get_data_from_mongo(args)
         df['date_time'] = pd.to_datetime(df['date_time'])
         df['date_time'] = pd.to_datetime(df['date_time'])
@@ -102,25 +102,23 @@ def get_nwp_from_ftp():
         my_exception = traceback.format_exc()
         my_exception = traceback.format_exc()
         my_exception.replace("\n", "\t")
         my_exception.replace("\n", "\t")
         result['msg'] = my_exception
         result['msg'] = my_exception
+        logger.info("预测文件下发ftp出错:{}".format(my_exception))
     end_time = time.time()
     end_time = time.time()
     result['success'] = success
     result['success'] = success
     result['args'] = args
     result['args'] = args
     result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
     result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
     result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
     result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
-    print("Program execution ends!")
     return result
     return result
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     print("Program starts execution!")
     print("Program starts execution!")
-    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-    logger = logging.getLogger("pre_data_ftp")
     from waitress import serve
     from waitress import serve
 
 
     serve(app, host="0.0.0.0", port=10101)
     serve(app, host="0.0.0.0", port=10101)
     print("server start!")
     print("server start!")
 
 
     # args = {"user": 'anweiguo', 'model': 'Zone', 'version': 1.0, 'hour': '06',
     # args = {"user": 'anweiguo', 'model': 'Zone', 'version': 1.0, 'hour': '06',
-    #         'farmId': 'J00645', 'mongodb_database': 'db2', 'mongodb_read_table': 'J00645_tmp_w', 'day_begin':'D1',
+    #         'farmId': 'J00645', 'mongodb_database': 'db2', 'mongodb_read_table': 'j00645_ori_res', 'day_begin':'D1',
     #         'day_end': 'D1'}
     #         'day_end': 'D1'}
     # df = get_data_from_mongo(args)
     # df = get_data_from_mongo(args)
     # df.rename(columns={'dateTime': 'date_time'}, inplace=True)
     # df.rename(columns={'dateTime': 'date_time'}, inplace=True)