David 4 місяців тому
батько
коміт
9db98d8792

+ 10 - 0
data_processing/cache/data/xxl/init.py

@@ -0,0 +1,10 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :init.py.py
+# @Time      :2024/12/27 15:13
+# @Author    :David
+# @Company: shenyang JY
+
+
+if __name__ == "__main__":
+    run_code = 0

+ 228 - 0
data_processing/data_operation/data_nwp_ftp.py

@@ -0,0 +1,228 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :data_nwp_ftp.py
+# @Time      :2024/12/26 08:38
+# @Author    :David
+# @Company: shenyang JY
+from datetime import timedelta
+from ftplib import FTP
+import threading, glob
+import pandas as pd
+from pytz import timezone
+from flask import Flask,request,jsonify
+import time, datetime, os, traceback, re
+import logging, zipfile, tempfile, shutil, fnmatch
+from common.database_dml import insert_data_into_mongo
+from apscheduler.schedulers.background import BackgroundScheduler
+
+
+app = Flask('data_nwp_ftp——service')
+
+
+def update_thread():
+    thread = threading.Thread(target=start_jobs)
+    thread.start()
+
+
+def start_jobs():
+    scheduler = BackgroundScheduler()
+    scheduler.configure({'timezone': timezone("Asia/Shanghai")})
+    scheduler.add_job(func=download_zip_files_from_ftp, args=['00'], trigger="cron", hour=0, minute=0)
+    scheduler.add_job(func=download_zip_files_from_ftp, args=['06'], trigger="cron", hour=6, minute=0)
+    scheduler.add_job(func=download_zip_files_from_ftp, args=['12'], trigger="cron", hour=12, minute=0)
+    scheduler.add_job(func=download_zip_files_from_ftp, args=['18'], trigger="cron", hour=18, minute=0)
+    scheduler.start()
+
+def match_date(date, filename):
+    given_date = datetime.datetime.strptime(date, '%Y%m%d')
+    date_pattern = re.compile(r'(\d{8})')
+    match = date_pattern.search(filename)
+    if match:
+        filename_str = match.group(0)
+        filename_date = datetime.datetime.strptime(filename_str, '%Y%m%d')
+        if filename_date <= given_date:
+            return True
+
+def delete_zip_files(date):
+    xxl_path = ftp_params['xxl']['local_dir']
+    # 遍历文件夹中的所有文件
+    for root, dirs, files in os.walk(xxl_path):
+        for filename in files:
+            # 检查文件名是否以 'meteo_date_' 开头且以 '.zip' 结尾
+            if fnmatch.fnmatch(filename, f'*.zip') and match_date(date, filename):
+                # 构建文件的完整路径
+                file_path = os.path.join(xxl_path, filename)
+                # 删除文件
+                try:
+                    os.remove(file_path)
+                    print(f"Deleted file: {file_path}")
+                except OSError as e:
+                    print(f"Error deleting file {file_path}: {e.strerror}")
+        for farmId in dirs:
+            target_dir_path = os.path.join(root, farmId)
+            for file_name in os.listdir(target_dir_path):
+                csv_file_path = os.path.join(target_dir_path, file_name)
+                if fnmatch.fnmatch(csv_file_path, f'*.csv') and match_date(date, file_name):
+                    try:
+                        os.remove(csv_file_path)
+                        print(f"Deleted file: {csv_file_path}")
+                    except OSError as e:
+                        print(f"Error deleting file {csv_file_path}: {e.strerror}")
+
+def download_zip_files_from_ftp(hour):
+    date = datetime.datetime.now().strftime("%Y%m%d")
+    date_2 = (datetime.datetime.now() - timedelta(days=2)).strftime("%Y%m%d")
+    host, moment  = 'xxl', hour
+    ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
+    zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
+    zip_file_path = []
+    # 连接到FTP服务器
+    with FTP(ftp_host) as ftp:
+        ftp.login(user=ftp_user, passwd=ftp_password)
+
+        # 切换到远程目录
+        ftp.cwd(remote_dir)
+
+        # 获取远程目录中的文件和目录列表
+        files = ftp.nlst()
+
+        # 遍历文件列表,找到ZIP文件并下载
+        for file_name in files:
+            if fnmatch.fnmatch(file_name, zip_extension):
+                remote_file_path = os.path.join(remote_dir, file_name)
+                local_file_path = os.path.join(local_dir, file_name)
+
+                with open(local_file_path, 'wb') as local_file:
+                    logging.info(f"Downloading {remote_file_path} to {local_file_path}")
+                    ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
+
+                logging.info(f"Downloaded {file_name}")
+                zip_file_path.append(local_file_path)
+    # 解压 ZIP 文件到临时目录
+    for zip_file_p in zip_file_path:
+        with zipfile.ZipFile(zip_file_p, 'r') as zip_ref:
+            zip_ref.extractall(local_dir)
+    # 删除前天之前所有 ZIP 文件
+    delete_zip_files(date_2)
+
+def select_file_to_mongo(args):
+    date, moment, farmId, isDq = args['date'], args['moment'], args['farmId'], args['isDq']
+    date = datetime.datetime.strptime(args['date'], '%Y-%m-%d 00:00:00').strftime("%Y%m%d")
+    csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
+    csv_file_weather = csv_file_format.replace('*', 'weather')
+    csv_file_power = csv_file_format.replace('*', 'power')
+    csv_weather_path, csv_power_path = False, False
+    # 查找目标目录并读取 CSV 文件
+    for root, dirs, files in os.walk(ftp_params['xxl']['local_dir']):
+        if farmId in dirs:
+            target_dir_path = os.path.join(root, farmId)
+            for file_name in os.listdir(target_dir_path):
+                csv_file_path = os.path.join(target_dir_path, file_name)
+                if fnmatch.fnmatch(file_name, csv_file_weather):
+                    csv_weather_path = csv_file_path
+                if fnmatch.fnmatch(file_name, csv_file_power):
+                    csv_power_path = csv_file_path
+            if csv_weather_path or csv_power_path:
+                break
+
+    # 使用 pandas 读取 CSV 文件
+    weather = pd.read_csv(csv_weather_path) if csv_power_path else None
+    power = pd.read_csv(csv_power_path) if csv_power_path else None
+    if isDq:
+        if csv_weather_path and csv_power_path:
+            power.drop(columns=['farm_id'])
+            weather_power = pd.merge(weather, power, on='date_time')
+            # 截取D0-D13时段数据
+            df = select_dx_from_nwp(weather_power, args)
+            insert_data_into_mongo(df, args)
+        else:
+            logging.info(f"CSV 文件 {csv_file_power} 或 {csv_file_weather} 在目标目录 {farmId} 中未找到")
+    else:
+        if csv_weather_path:
+            weather = select_dx_from_nwp(weather, args)
+            # 截取D0-D13时段数据
+            df = select_dx_from_nwp(weather, args)
+            insert_data_into_mongo(df, args)
+        else:
+            logging.info(f"CSV 文件 {csv_file_weather} 在目标目录 {farmId} 中未找到")
+
+
+def select_dx_from_nwp(df, args):
+    date = datetime.datetime.strptime(args['date'], '%Y-%m-%d 00:00:00')
+    date_begin = date + pd.Timedelta(days=int(args.get('day_begin', 'D0')[1:]))
+    date_end = date + pd.Timedelta(days=int(args.get('day_end', 'D13')[1:]))
+    df['date_time'] = df['date_time'].str.replace("_", " ")
+    df['date_time'] = pd.to_datetime(df['date_time'])
+    df.set_index('date_time', inplace=True)
+    df = df.loc[date_begin.strftime('%Y-%m-%d'): date_end.strftime('%Y-%m-%d')].reset_index(drop=False)
+    df.reset_index(drop=True, inplace=True)
+    df['date_time'] = df['date_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
+    return df
+
+# 示例使用
+ftp_params = {
+    'xxl' : {
+        'host' : '39.107.246.215',
+        'user' : 'jiayue',
+        'password' : 'JYoguf2018',
+        'remote_dir' : './',
+        'local_dir': '../cache/data/xxl'
+    }
+}
+
+
+@app.route('/data_nwp_ftp', methods=['POST'])
+def get_nwp_from_ftp():
+    # 获取程序开始时间
+    start_time = time.time()
+    result = {}
+    success = 0
+    print("Program starts execution!")
+    try:
+        args = request.values.to_dict()
+        # 1. 获取参数:日期,数据源,时刻,D0-9,场站ID,存储的 mongo 和表
+        print('args', args)
+        logger.info(args)
+        # 2. 连接FTP,从FTP服务器中获取指定参数的压缩文件(定时任务)
+        # 3. 解压压缩文件,将其存储到mongo中
+        select_file_to_mongo(args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n", "\t")
+        result['msg'] = my_exception
+    end_time = time.time()
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    print("Program execution ends!")
+    return result
+
+
+if __name__ == "__main__":
+    print("Program starts execution!")
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    logger = logging.getLogger("data_nwp_ftp")
+    from waitress import serve
+    update_thread() #定时任务开启
+    current_time = datetime.datetime.now()
+    current_hour = current_time.hour
+    if current_hour >= 18:
+        current_hour = '18'
+    elif current_hour >= 12:
+        current_hour = '12'
+    elif current_hour >= 6:
+        current_hour = '06'
+    else:
+        current_hour = '00'
+    threading.Thread(target=download_zip_files_from_ftp, args=(current_hour,)).start()
+    serve(app, host="0.0.0.0", port=10102)
+    print("server start!")
+
+    # args = {"source": 'xxl', "date": '2024-12-27 00:00:00', 'moment': '06', 'farmId': 'J00645',
+    # 'mongodb_database': 'db2', 'mongodb_write_table': 'j00645-d1', 'day_begin':'D1',
+    #         'day_end': 'D1', 'isDq': True}
+    # download_zip_files_from_ftp()
+    # select_file_to_mongo(args)
+    # delete_zip_files('20241225')

+ 124 - 0
data_processing/data_operation/pre_data_ftp.py

@@ -0,0 +1,124 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :pre_data_ftp.py
+# @Time      :2024/12/26 14:18
+# @Author    :David
+# @Company: shenyang JY
+from ftplib import FTP
+import pandas as pd
+from flask import Flask,request,jsonify
+import time, datetime, os, traceback
+import logging, zipfile, tempfile, shutil
+from common.database_dml import get_data_from_mongo
+
+app = Flask('pre_data_ftp——service')
+
+ftp_params = {
+    'host' : '192.168.12.20',
+    'port': 32121,
+    'zip_mode': 'w',
+    'liudawei' : {
+        'user' : 'liudawei',
+        'password' : 'liudawei@123',
+        'modeler' : 'koi'
+    },
+    'anweiguo':{
+        'user' : 'anweiguo',
+        'password' : 'anweiguo@123',
+        'modeler' : 'seer'
+    }
+}
+
+def zip_temp_file(df, args):
+    def zip_folder(folder_path, zip_filePath):
+        zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED)
+        for root, dirs, files in os.walk(folder_path):
+            for file in files:
+                file_path = os.path.join(root, file)
+                zip_file.write(file_path, os.path.relpath(file_path, folder_path))
+        zip_file.close()
+    temp_dir, tem_dir_zip = tempfile.mkdtemp(dir='../cache/data'), tempfile.mkdtemp(dir='../cache/data')
+    dt = df.loc[0, 'date_time'].strftime('%Y%m%d')
+    modeler, model, version, farmId, moment = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId'], args['moment']
+    csv_file = 'jy_{}.{}{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, dt, moment)
+    csv_path = os.path.join(temp_dir, farmId, csv_file)
+    os.makedirs(os.path.dirname(csv_path), exist_ok=True)
+    df.to_csv(csv_path, index=False)
+    zip_file = 'jy_{}.{}.{}_{}{}_dq.zip'.format(modeler, model, version, dt, moment)
+    zip_path = os.path.join(tem_dir_zip, zip_file)
+    zip_folder(temp_dir, zip_path)
+    shutil.rmtree(temp_dir)
+    return zip_path, zip_file
+
+def upload_ftp(zip_path, zip_file, args):
+    ftp_host, ftp_port, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port'], args['user'], ftp_params[args['user']]['password']
+    # 创建 FTP 连接
+    ftp = FTP()
+
+    # 使用主动模式
+    ftp.set_pasv(False)
+
+    # 连接到 FTP 服务器并指定端口
+    ftp.connect(ftp_host, ftp_port)  # 使用自定义端口号
+
+    # 登录到 FTP 服务器
+    ftp.login(ftp_user, ftp_pass)
+
+    # 上传文件
+    with open(zip_path, 'rb') as f:
+        ftp.storbinary('STOR /' + ftp_params[args['user']]['modeler'] + '/'+zip_file, f)
+
+    # 退出 FTP 连接
+    ftp.quit()
+    shutil.rmtree(os.path.dirname(zip_path))
+    # os.remove(zip_path)
+    print("File uploaded successfully")
+
+@app.route('/pre_data_ftp', methods=['POST'])
+def get_nwp_from_ftp():
+    # 获取程序开始时间
+    start_time = time.time()
+    result = {}
+    success = 0
+    args = {}
+    print("Program starts execution!")
+    try:
+        args = request.values.to_dict()
+        # 1. 获取 mongo 中的预测结果
+        print('args', args)
+        logger.info(args)
+        df = get_data_from_mongo(args)
+        # 2. 将预测结果保存成csv临时文件,命名压缩
+        zip_path, zip_file = zip_temp_file(df, args)
+        # 3. 上传到指定的FTP服务器中
+        upload_ftp(zip_path, zip_file, args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n", "\t")
+        result['msg'] = my_exception
+    end_time = time.time()
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    print("Program execution ends!")
+    return result
+
+if __name__ == "__main__":
+    print("Program starts execution!")
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    logger = logging.getLogger("pre_data_ftp")
+    from waitress import serve
+
+    serve(app, host="0.0.0.0", port=10101)
+    print("server start!")
+
+    # args = {"user": 'anweiguo', 'model': 'Zone', 'version': 1.0, 'hour': '06',
+    #         'farmId': 'J00645', 'mongodb_database': 'db2', 'mongodb_read_table': 'J00645_tmp_w', 'day_begin':'D1',
+    #         'day_end': 'D1'}
+    # df = get_data_from_mongo(args)
+    # df.rename(columns={'dateTime': 'date_time'}, inplace=True)
+    # df['date_time'] = pd.to_datetime(df['date_time'])
+    # zip_path, zip_file = zip_temp_file(df, args)
+    # upload_ftp(zip_path, zip_file, args)