123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :data_nwp_ftp.py
- # @Time :2024/12/26 08:38
- # @Author :David
- # @Company: shenyang JY
- from datetime import timedelta
- from ftplib import FTP
- import threading, glob
- import pandas as pd
- from pytz import timezone
- from flask import Flask,request,jsonify
- import time, datetime, os, traceback, re
- import logging, zipfile, tempfile, shutil, fnmatch
- from common.database_dml import insert_data_into_mongo
- from apscheduler.schedulers.background import BackgroundScheduler
- app = Flask('data_nwp_ftp——service')
- def update_thread():
- thread = threading.Thread(target=start_jobs)
- thread.start()
- def start_jobs():
- scheduler = BackgroundScheduler()
- scheduler.configure({'timezone': timezone("Asia/Shanghai")})
- scheduler.add_job(func=download_zip_files_from_ftp, args=['00'], trigger="cron", hour=0, minute=0)
- scheduler.add_job(func=download_zip_files_from_ftp, args=['06'], trigger="cron", hour=6, minute=0)
- scheduler.add_job(func=download_zip_files_from_ftp, args=['12'], trigger="cron", hour=12, minute=0)
- scheduler.add_job(func=download_zip_files_from_ftp, args=['18'], trigger="cron", hour=18, minute=0)
- scheduler.start()
- def match_date(date, filename):
- given_date = datetime.datetime.strptime(date, '%Y%m%d')
- date_pattern = re.compile(r'(\d{8})')
- match = date_pattern.search(filename)
- if match:
- filename_str = match.group(0)
- filename_date = datetime.datetime.strptime(filename_str, '%Y%m%d')
- if filename_date <= given_date:
- return True
- def delete_zip_files(date):
- xxl_path = ftp_params['xxl']['local_dir']
- # 遍历文件夹中的所有文件
- for root, dirs, files in os.walk(xxl_path):
- for filename in files:
- # 检查文件名是否以 'meteo_date_' 开头且以 '.zip' 结尾
- if fnmatch.fnmatch(filename, f'*.zip') and match_date(date, filename):
- # 构建文件的完整路径
- file_path = os.path.join(xxl_path, filename)
- # 删除文件
- try:
- os.remove(file_path)
- print(f"Deleted file: {file_path}")
- except OSError as e:
- print(f"Error deleting file {file_path}: {e.strerror}")
- for farmId in dirs:
- target_dir_path = os.path.join(root, farmId)
- for file_name in os.listdir(target_dir_path):
- csv_file_path = os.path.join(target_dir_path, file_name)
- if fnmatch.fnmatch(csv_file_path, f'*.csv') and match_date(date, file_name):
- try:
- os.remove(csv_file_path)
- print(f"Deleted file: {csv_file_path}")
- except OSError as e:
- print(f"Error deleting file {csv_file_path}: {e.strerror}")
- def download_zip_files_from_ftp(hour):
- date = datetime.datetime.now().strftime("%Y%m%d")
- date_2 = (datetime.datetime.now() - timedelta(days=2)).strftime("%Y%m%d")
- host, moment = 'xxl', hour
- ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
- zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
- zip_file_path = []
- # 连接到FTP服务器
- with FTP(ftp_host) as ftp:
- ftp.login(user=ftp_user, passwd=ftp_password)
- # 切换到远程目录
- ftp.cwd(remote_dir)
- # 获取远程目录中的文件和目录列表
- files = ftp.nlst()
- # 遍历文件列表,找到ZIP文件并下载
- for file_name in files:
- if fnmatch.fnmatch(file_name, zip_extension):
- remote_file_path = os.path.join(remote_dir, file_name)
- local_file_path = os.path.join(local_dir, file_name)
- with open(local_file_path, 'wb') as local_file:
- logging.info(f"Downloading {remote_file_path} to {local_file_path}")
- ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
- logging.info(f"Downloaded {file_name}")
- zip_file_path.append(local_file_path)
- # 解压 ZIP 文件到临时目录
- for zip_file_p in zip_file_path:
- with zipfile.ZipFile(zip_file_p, 'r') as zip_ref:
- zip_ref.extractall(local_dir)
- # 删除前天之前所有 ZIP 文件
- delete_zip_files(date_2)
- def select_file_to_mongo(args):
- date, moment, farmId, isDq = args['date'], args['moment'], args['farmId'], args['isDq']
- date = datetime.datetime.strptime(args['date'], '%Y-%m-%d 00:00:00').strftime("%Y%m%d")
- csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
- csv_file_weather = csv_file_format.replace('*', 'weather')
- csv_file_power = csv_file_format.replace('*', 'power')
- csv_weather_path, csv_power_path = False, False
- # 查找目标目录并读取 CSV 文件
- for root, dirs, files in os.walk(ftp_params['xxl']['local_dir']):
- if farmId in dirs:
- target_dir_path = os.path.join(root, farmId)
- for file_name in os.listdir(target_dir_path):
- csv_file_path = os.path.join(target_dir_path, file_name)
- if fnmatch.fnmatch(file_name, csv_file_weather):
- csv_weather_path = csv_file_path
- if fnmatch.fnmatch(file_name, csv_file_power):
- csv_power_path = csv_file_path
- if csv_weather_path or csv_power_path:
- break
- # 使用 pandas 读取 CSV 文件
- weather = pd.read_csv(csv_weather_path) if csv_power_path else None
- power = pd.read_csv(csv_power_path) if csv_power_path else None
- if isDq:
- if csv_weather_path and csv_power_path:
- power.drop(columns=['farm_id'])
- weather_power = pd.merge(weather, power, on='date_time')
- # 截取D0-D13时段数据
- df = select_dx_from_nwp(weather_power, args)
- insert_data_into_mongo(df, args)
- else:
- logging.info(f"CSV 文件 {csv_file_power} 或 {csv_file_weather} 在目标目录 {farmId} 中未找到")
- else:
- if csv_weather_path:
- weather = select_dx_from_nwp(weather, args)
- # 截取D0-D13时段数据
- df = select_dx_from_nwp(weather, args)
- insert_data_into_mongo(df, args)
- else:
- logging.info(f"CSV 文件 {csv_file_weather} 在目标目录 {farmId} 中未找到")
- def select_dx_from_nwp(df, args):
- date = datetime.datetime.strptime(args['date'], '%Y-%m-%d 00:00:00')
- date_begin = date + pd.Timedelta(days=int(args.get('day_begin', 'D0')[1:]))
- date_end = date + pd.Timedelta(days=int(args.get('day_end', 'D13')[1:]))
- df['date_time'] = df['date_time'].str.replace("_", " ")
- df['date_time'] = pd.to_datetime(df['date_time'])
- df.set_index('date_time', inplace=True)
- df = df.loc[date_begin.strftime('%Y-%m-%d'): date_end.strftime('%Y-%m-%d')].reset_index(drop=False)
- df.reset_index(drop=True, inplace=True)
- df['date_time'] = df['date_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
- return df
- # 示例使用
- ftp_params = {
- 'xxl' : {
- 'host' : '39.107.246.215',
- 'user' : 'jiayue',
- 'password' : 'JYoguf2018',
- 'remote_dir' : './',
- 'local_dir': '../cache/data/xxl'
- }
- }
- @app.route('/data_nwp_ftp', methods=['POST'])
- def get_nwp_from_ftp():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- print("Program starts execution!")
- try:
- args = request.values.to_dict()
- # 1. 获取参数:日期,数据源,时刻,D0-9,场站ID,存储的 mongo 和表
- print('args', args)
- logger.info(args)
- # 2. 连接FTP,从FTP服务器中获取指定参数的压缩文件(定时任务)
- # 3. 解压压缩文件,将其存储到mongo中
- select_file_to_mongo(args)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n", "\t")
- result['msg'] = my_exception
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
- if __name__ == "__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("data_nwp_ftp")
- from waitress import serve
- update_thread() #定时任务开启
- current_time = datetime.datetime.now()
- current_hour = current_time.hour
- if current_hour >= 18:
- current_hour = '18'
- elif current_hour >= 12:
- current_hour = '12'
- elif current_hour >= 6:
- current_hour = '06'
- else:
- current_hour = '00'
- threading.Thread(target=download_zip_files_from_ftp, args=(current_hour,)).start()
- serve(app, host="0.0.0.0", port=10102)
- print("server start!")
- # args = {"source": 'xxl', "date": '2024-12-27 00:00:00', 'moment': '06', 'farmId': 'J00645',
- # 'mongodb_database': 'db2', 'mongodb_write_table': 'j00645-d1', 'day_begin':'D0',
- # 'day_end': 'D13', 'isDq': True}
- # download_zip_files_from_ftp(hour='06')
- # select_file_to_mongo(args)
- # delete_zip_files('20241225')
|