123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :data_nwp_ftp.py
- # @Time :2024/12/26 08:38
- # @Author :David
- # @Company: shenyang JY
- from datetime import timedelta
- from ftplib import FTP
- import threading, glob, pytz
- import pandas as pd
- from pytz import timezone
- from flask import Flask,request,jsonify
- import time, datetime, os, traceback, re
- import zipfile, tempfile, shutil, fnmatch
- from common.database_dml import insert_data_into_mongo
- from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.triggers.cron import CronTrigger
- from common.logs import Log
- logger = Log('data-processing').logger
- app = Flask('data_nwp_ftp——service')
- def update_thread():
- thread = threading.Thread(target=start_jobs)
- thread.start()
- def start_jobs():
- scheduler = BackgroundScheduler()
- scheduler.configure({'timezone': timezone("Asia/Shanghai")})
- trigger1 = CronTrigger(
- hour='1,2,5,6,11,12,13,17,18', # 主触发小时
- minute='0-50/10', # 每 10 分钟一次,覆盖 00:00-00:50
- timezone='Asia/Shanghai' # 按需设置时区
- )
- scheduler.add_job(func=download_zip_files_from_ftp, trigger=trigger1)
- scheduler.start()
- def match_date(date, filename):
- given_date = datetime.datetime.strptime(date, '%Y%m%d')
- date_pattern = re.compile(r'(\d{8})')
- match = date_pattern.search(filename)
- if match:
- filename_str = match.group(0)
- filename_date = datetime.datetime.strptime(filename_str, '%Y%m%d')
- if filename_date <= given_date:
- return True
- def delete_zip_files(date):
- xxl_path = ftp_params['xxl']['local_dir']
- # 遍历文件夹中的所有文件
- for root, dirs, files in os.walk(xxl_path):
- for filename in files:
- # 检查文件名是否以 'meteo_date_' 开头且以 '.zip' 结尾
- if fnmatch.fnmatch(filename, f'*.zip') and match_date(date, filename):
- # 构建文件的完整路径
- file_path = os.path.join(xxl_path, filename)
- # 删除文件
- try:
- os.remove(file_path)
- print(f"Deleted file: {file_path}")
- except OSError as e:
- print(f"Error deleting file {file_path}: {e.strerror}")
- for farmId in dirs:
- target_dir_path = os.path.join(root, farmId)
- for file_name in os.listdir(target_dir_path):
- csv_file_path = os.path.join(target_dir_path, file_name)
- if fnmatch.fnmatch(csv_file_path, f'*.csv') and match_date(date, file_name):
- try:
- os.remove(csv_file_path)
- print(f"Deleted file: {csv_file_path}")
- except OSError as e:
- print(f"Error deleting file {csv_file_path}: {e.strerror}")
- def get_moment_next(schedule_dt=None):
- if schedule_dt is not None:
- now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
- else:
- now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
- date = now.strftime('%Y%m%d')
- if now.hour == 18:
- moment = '18'
- elif now.hour > 18:
- moment = '00'
- elif now.hour == 12:
- moment = '12'
- elif now.hour > 12:
- moment = '18'
- elif now.hour == 6:
- moment = '06'
- elif now.hour > 6:
- moment = '12'
- elif 2 >= now.hour >= 0:
- moment = '00'
- else:
- moment = '06'
- return date, moment
- def get_previous_moment(original_date, original_moment):
- # 将原date和moment转换为上海时区的datetime对象
- tz_shanghai = timezone('Asia/Shanghai')
- original_dt = datetime.datetime.strptime(f"{original_date}{original_moment}", "%Y%m%d%H")
- original_dt = tz_shanghai.localize(original_dt)
- # 减去6小时
- new_dt = original_dt - datetime.timedelta(hours=6)
- # 转换为原方法可接受的schedule_dt字符串格式
- schedule_dt_str = new_dt.strftime('%Y-%m-%d %H:%M:%S')
- # 调用原方法获取新的date和moment
- new_date, new_moment = get_moment_next(schedule_dt=schedule_dt_str)
- return new_date, new_moment
- def download_zip_files_from_ftp(moment=None):
- now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
- date = now.strftime("%Y%m%d")
- date_2 = (now - timedelta(days=2)).strftime("%Y%m%d")
- if moment is None:
- date, moment = get_moment_next()
- host = 'xxl'
- ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
- zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
- zip_file_path = []
- # 连接到FTP服务器
- with FTP(ftp_host) as ftp:
- ftp.login(user=ftp_user, passwd=ftp_password)
- # 切换到远程目录
- ftp.cwd(remote_dir)
- # 获取远程目录中的文件和目录列表
- files = ftp.nlst()
- # 遍历文件列表,找到ZIP文件并下载
- for file_name in files:
- if fnmatch.fnmatch(file_name, zip_extension):
- remote_file_path = os.path.join(remote_dir, file_name)
- local_file_path = os.path.join(local_dir, file_name)
- if os.path.isfile(local_file_path):
- continue
- with open(local_file_path, 'wb') as local_file:
- logger.info(f"Downloading {remote_file_path} to {local_file_path}")
- ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
- logger.info(f"Downloaded {file_name}")
- zip_file_path.append(local_file_path)
- # 解压 ZIP 文件到临时目录
- for zip_file_p in zip_file_path:
- with zipfile.ZipFile(zip_file_p, 'r') as zip_ref:
- zip_ref.extractall(local_dir)
- # 删除前天之前所有 ZIP 文件
- delete_zip_files(date_2)
- def select_file_to_mongo(args, date, moment):
- farmId, isDq = args['farmId'], args['isDq']
- csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
- csv_file_weather = csv_file_format.replace('*', 'weather')
- csv_file_power = csv_file_format.replace('*', 'power')
- csv_weather_path, csv_power_path = False, False
- # 查找目标目录并读取 CSV 文件
- for root, dirs, files in os.walk(ftp_params['xxl']['local_dir']):
- if farmId in dirs:
- target_dir_path = os.path.join(root, farmId)
- for file_name in os.listdir(target_dir_path):
- csv_file_path = os.path.join(target_dir_path, file_name)
- if fnmatch.fnmatch(file_name, csv_file_weather):
- csv_weather_path = csv_file_path
- logger.info("***找到nwp:{}***".format(csv_weather_path))
- if fnmatch.fnmatch(file_name, csv_file_power):
- csv_power_path = csv_file_path
- logger.info("***找到power:{}***".format(csv_power_path))
- if csv_weather_path or csv_power_path:
- break
- if csv_weather_path is False:
- logger.info("获取nwp文件异常:找不到场站 {} nwp文件".format(farmId))
- return False
- # 使用 pandas 读取 CSV 文件
- weather = pd.read_csv(csv_weather_path)
- power = pd.read_csv(csv_power_path) if csv_power_path else None
- if isDq:
- if csv_weather_path and csv_power_path:
- power.drop(columns=['farm_id'], inplace=True)
- weather_power = pd.merge(weather, power, on='date_time')
- # 截取D0-D13时段数据
- df = select_dx_from_nwp(weather_power, args)
- insert_data_into_mongo(df, args)
- else:
- df = select_dx_from_nwp(weather, args)
- insert_data_into_mongo(df, args)
- logger.info(f"CSV 文件 {csv_file_power} 在目标目录 {farmId} 中未找到")
- else:
- if csv_weather_path:
- weather = select_dx_from_nwp(weather, args)
- # 截取D0-D13时段数据
- df = select_dx_from_nwp(weather, args)
- insert_data_into_mongo(df, args)
- else:
- logger.info(f"CSV 文件 {csv_file_weather} 在目标目录 {farmId} 中未找到")
- return True
- def select_dx_from_nwp(df, args):
- date, moment = get_moment_next(args.get('dt'))
- date = datetime.datetime.strptime(date, "%Y%m%d")
- date_begin = date + pd.Timedelta(days=int(args.get('day_begin', 'D0')[1:]))
- date_end = date + pd.Timedelta(days=int(args.get('day_end', 'D13')[1:]))
- df['date_time'] = df['date_time'].str.replace("_", " ")
- df['date_time'] = pd.to_datetime(df['date_time'])
- df.set_index('date_time', inplace=True)
- df = df.loc[date_begin.strftime('%Y-%m-%d'): date_end.strftime('%Y-%m-%d')].reset_index(drop=False)
- df.reset_index(drop=True, inplace=True)
- df['date_time'] = df['date_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
- return df
- # 示例使用
- ftp_params = {
- 'xxl' : {
- 'host' : '39.107.246.215',
- 'user' : 'jiayue',
- 'password' : 'JYoguf2018',
- 'remote_dir' : './',
- 'local_dir': 'data_processing/cache/data/xxl'
- }
- }
- @app.route('/data_nwp_ftp', methods=['POST'])
- def get_nwp_from_ftp():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = False
- args = {}
- # print("data_nwp_ftp starts execution!")
- try:
- args = request.values.to_dict()
- # 1. 获取参数:日期,数据源,时刻,D0-9,场站ID,存储的 mongo 和表
- # print('args', args)
- logger.info(args)
- # 2. 连接FTP,从FTP服务器中获取指定参数的压缩文件(定时任务)
- # 3. 解压压缩文件,将其存储到mongo中
- date, moment = get_moment_next(args.get('dt'))
- logger.info("从日期{}时刻{}开始查找文件".format(date, moment))
- success = select_file_to_mongo(args, date, moment)
- new_date = date
- i = 1
- while not success and date == new_date and i <= 2:
- new_date, moment = get_previous_moment(date, moment)
- logger.info("未找到,从日期{}时刻{}开始查找文件".format(new_date, moment))
- success = select_file_to_mongo(args, new_date, moment)
- i += 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n", "\t")
- result['msg'] = my_exception
- logger.info("生产,获取原始nwp出错:{}".format(my_exception))
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- # print("Program execution ends!")
- return result
- if __name__ == "__main__":
- print("Program starts execution!")
- from waitress import serve
- update_thread() #定时任务开启
- # 遍历参数,创建并启动线程
- for moment in ['00', '06', '12', '18']:
- threading.Thread(target=download_zip_files_from_ftp, kwargs={'moment': moment}).start()
- serve(app, host="0.0.0.0", port=10102)
- print("server start!")
- # args = {"source": 'xxl', "date": '2024-12-27', 'moment': '06', 'farmId': 'J00645',
- # 'mongodb_database': 'db2', 'mongodb_write_table': 'j00645-d1', 'day_begin':'D0',
- # 'day_end': 'D13', 'isDq': True}
- # download_zip_files_from_ftp(hour='06')
- # select_file_to_mongo(args)
- # delete_zip_files('20241225')
|