Przeglądaj źródła

Merge branch 'dev_awg' of anweiguo/algorithm_platform into dev_david

liudawei 3 miesięcy temu
rodzic
commit
369873574c

+ 6 - 1
common/processing_data_common.py

@@ -1,6 +1,6 @@
 import random
 from datetime import date, timedelta
-
+import pandas as pd
 def str_to_list(arg):
     if arg == '':
         return []
@@ -45,3 +45,8 @@ def check_nwp_data(nwp_df,features):
         message=''
     return message
 
+
+def get_xxl_dq(farm_id, dt):
+    path_dir = f'data_processing/cache/data/xxl/{farm_id}/meteoforce_{farm_id}_{dt}06_power.csv'
+    df = pd.read_csv(path_dir,index_col=0)
+    return df[['farm_id', 'date_time', 'power_forecast']]

+ 83 - 0
data_processing/data_operation/data_tj_nwp_ftp.py

@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :data_nwp_ftp.py
+# @Time      :2024/12/26 08:38
+# @Author    :David
+# @Company: shenyang JY
+from ftplib import FTP
+import pandas as pd
+from flask import Flask,request
+import time, traceback, io
+from common.database_dml import insert_data_into_mongo
+from common.alert import send_message
+from common.logs import Log
+from datetime import datetime, timedelta
+logger = Log('data-processing').logger
+
+app = Flask('data_tj_nwp_ftp——service')
+
+
+def connect_ftp_get_data(params):
+    date_obj = datetime.strptime(params['dt'], '%Y%m%d')
+    # 减 1 天
+    date_file = (date_obj - timedelta(days=1)).strftime('%Y%m%d')
+    # 加 1 天
+    d1 = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
+    farm_id = params['farmId']
+    ftp_host = '58.240.173.20'  # FTP 服务器地址
+    ftp_port = 65321  # FTP 服务器端口,默认为 21
+    ftp_user = 'zktjcE'  # FTP 用户名
+    ftp_pass = 'HAFuapDVyYV'  # FTP 密码
+    ftp_directory = f'/contract/{date_file}12'  # 目标文件夹路径
+    farm_csv = f'{farm_id}.csv'
+    ftp = FTP()
+    ftp.connect(ftp_host, ftp_port)  # 连接到指定的 FTP 服务器和端口
+    ftp.login(ftp_user, ftp_pass)  # 使用用户名和密码登录
+    ftp.cwd(ftp_directory)
+    # 列出该文件夹中的所有文件
+    files = ftp.nlst()
+    if farm_csv not in files:
+        send_message('FTP获取天机NWP数据接口组件', farm_id, '缺少NWP文件!')
+    else:
+        # 从 FTP 下载文件并读取到内存
+        with io.BytesIO() as byte_io:
+            ftp.retrbinary(f'RETR {farm_csv}', byte_io.write)  # 下载文件内容到内存
+            byte_io.seek(0)  # 将文件指针定位到文件开头
+            df = pd.read_csv(byte_io)  # 将 CSV 数据读取到 Pandas DataFrame
+    # 关闭 FTP 连接
+    ftp.quit()
+    df['dateTime'] = pd.to_datetime(df['time(UTC)']).dt.tz_localize('UTC').dt.tz_convert('Asia/Shanghai').dt.strftime(
+        '%Y-%m-%d %H:%M:%S')
+    return df[df['dateTime']>=d1]
+
+@app.route('/data_tj_nwp_ftp', methods=['POST'])
+def data_tj_nwp_ftp():
+    # 获取程序开始时间
+    start_time = time.time()
+    result = {}
+    success = 0
+    args = {}
+    try:
+        args = request.values.to_dict()
+        logger.info(args)
+        df = connect_ftp_get_data(args)
+        insert_data_into_mongo(df, args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n", "\t")
+        result['msg'] = my_exception
+        logger.info("生产,获取原始天机nwp出错:{}".format(my_exception))
+    end_time = time.time()
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    return result
+
+
+if __name__ == "__main__":
+    print("Program starts execution!")
+    from waitress import serve
+    serve(app, host="0.0.0.0", port=10106)
+    print("server start!")

+ 21 - 19
models_processing/model_predict/model_prediction_lightgbm.py

@@ -10,6 +10,9 @@ from common.alert import send_message
 from datetime import datetime, timedelta
 import pytz
 from pytz import timezone
+
+from common.processing_data_common import get_xxl_dq
+
 app = Flask('model_prediction_lightgbm——service')
 def str_to_list(arg):
     if arg == '':
@@ -20,25 +23,25 @@ def str_to_list(arg):
 
 def forecast_data_distribution(pre_data,args):
     col_time = args['col_time']
-
-    # tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d')
+    farm_id = args['farmId']
+    dt = datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai")).strftime('%Y%m%d')
     tomorrow = (datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai")) + timedelta(days=1)).strftime('%Y-%m-%d')
-    field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure':'surfacePressure'}
+    field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure':'surfacePressure',
+                     'wd140m': 'tj_wd140','ws140m': 'tj_ws140','wd170m': 'tj_wd170','cldt': 'tj_tcc','wd70m': 'tj_wd70',
+                     'ws100m': 'tj_ws100','DSWRFsfc': 'tj_radiation','wd10m': 'tj_wd10','TMP2m': 'tj_t2','wd30m': 'tj_wd30',
+                     'ws30m': 'tj_ws30','rh2m': 'tj_rh','PRATEsfc': 'tj_pratesfc','ws170m': 'tj_ws170','wd50m': 'tj_wd50',
+                     'ws50m': 'tj_ws50','wd100m': 'tj_wd100','ws70m': 'tj_ws70','ws10m': 'tj_ws10','psz': 'tj_pressure',
+                     'cldl': 'tj_lcc','pres': 'tj_pres','dateTime':'date_time'}
     # 根据字段映射重命名列
     pre_data = pre_data.rename(columns=field_mapping)
 
     if len(pre_data)==0:
-        send_message('lightgbm预测组件', args['farmId'], '请注意:获取NWP数据为空,预测文件无法生成!')
-        result = pd.DataFrame({'farm_id':[], col_time:[], 'power_forecast':[]})
+        send_message('lightgbm预测组件', farm_id, '请注意:获取NWP数据为空,预测文件无法生成!')
+        result = get_xxl_dq(farm_id, dt)
 
     elif len(pre_data[pre_data[col_time].str.contains(tomorrow)])<96:
-        send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!")
-        start_time = pre_data[col_time].min()
-        end_time = pre_data[col_time].max()
-        date_range = pd.date_range(start=start_time, end=end_time, freq='15T').strftime('%Y-%m-%d %H:%M:%S').tolist()
-        df_date = pd.DataFrame({col_time:date_range})
-        result = pd.merge(df_date,pre_data,how='left',on=col_time).sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
-        result = result[['farm_id', 'date_time', 'power_forecast']]
+        send_message('lightgbm预测组件', farm_id, "日前数据记录缺失,不足96条,用DQ代替并补值!")
+        result = get_xxl_dq(farm_id, dt)
     else:
         df = pre_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
         mongodb_connection, mongodb_database, mongodb_model_table, model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", \
@@ -53,19 +56,18 @@ def forecast_data_distribution(pre_data,args):
             model = pickle.loads(model_binary)
             diff = set(model.feature_name()) - set(pre_data.columns)
             if len(diff) > 0:
-                send_message('lightgbm预测组件', args['farmId'], f'NWP特征列缺失,使用DQ代替!features:{diff}')
-                result = pre_data[['farm_id', 'date_time', 'power_forecast']]
+                send_message('lightgbm预测组件', farm_id, f'NWP特征列缺失,使用DQ代替!features:{diff}')
+                result = get_xxl_dq(farm_id, dt)
             else:
                 df['power_forecast'] = model.predict(df[model.feature_name()])
                 df.loc[df['power_forecast'] < 0, 'power_forecast'] = 0
-                # 添加小时列 把光夜间置为0
-                df["hour"] = pd.to_datetime(df["date_time"]).dt.hour
-                df.loc[(df["hour"] >= 20) | (df["hour"] < 6), 'power_forecast'] = 0
                 print("model predict result  successfully!")
+                if 'farm_id' not in df.columns:
+                    df['farm_id'] = farm_id
                 result = df[['farm_id', 'date_time', 'power_forecast']]
         else:
-            send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!")
-            result = pre_data[['farm_id', 'date_time', 'power_forecast']]
+            send_message('lightgbm预测组件', farm_id, "模型文件缺失,用DQ代替并补值!")
+            result = get_xxl_dq(farm_id, dt)
     result['power_forecast'] = round(result['power_forecast'],2)
     return result
 

+ 66 - 0
post_processing/pre_post_processing.py

@@ -0,0 +1,66 @@
+import pandas as pd
+from flask import Flask,request,jsonify
+import time
+import logging
+import traceback
+
+from common.database_dml import get_data_from_mongo,insert_data_into_mongo
+
+app = Flask('post_processing——service')
+
+
+@app.route('/hello', methods=['POST'])
+def hello():
+    return jsonify(message='Hello, World!')
+
+
+def  predict_result_adjustment(df, args):
+    col_time = args['col_time']
+    col_adjust = args['col_adjust']
+    ratio_dict = eval(args['ratio'])
+    df['hour'] = df[col_time].apply(lambda x:x[11:13])
+    rates = ratio_dict.keys()
+    for rate in rates:
+        hours = ratio_dict[rate]
+        df.loc[df['hour'].isin(hours), col_adjust] *= float(rate)
+    return df.drop('hour', axis=1)
+
+
+@app.route('/pre_post_processing', methods=['POST'])
+def data_join():
+    # 获取程序开始时间  
+    start_time = time.time()  
+    result = {}
+    success = 0
+    print("Program starts execution!")
+    try:
+        args = request.values.to_dict()
+        print('args',args)
+        logger.info(args)
+        df_pre = get_data_from_mongo(args)
+        res_df = predict_result_adjustment(df_pre, args)
+        insert_data_into_mongo(res_df,args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n","\t")
+        result['msg'] = my_exception
+    end_time = time.time()
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    print("Program execution ends!")
+    return result
+
+
+if __name__=="__main__":
+    print("Program starts execution!")
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    logger = logging.getLogger("post_processing")
+    from waitress import serve
+    serve(app, host="0.0.0.0", port=10107)
+    print("server start!")
+    
+   
+    

+ 2 - 0
run_all.py

@@ -22,6 +22,8 @@ services = [
     ("data_processing/data_operation/data_nwp_ftp.py", 10102),
     ("models_processing/model_train/model_training_bp.py", 10103),
     ("models_processing/model_predict/model_prediction_bp.py", 10104),
+    ("data_processing/data_operation/data_tj_nwp_ftp.py", 10106),
+    ("post_processing/pre_post_processing.py", 10107),
 ]
 
 # 获取当前脚本所在的根目录