|
@@ -28,7 +28,7 @@ def update_thread():
|
|
def start_jobs():
|
|
def start_jobs():
|
|
scheduler = BackgroundScheduler()
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.configure({'timezone': timezone("Asia/Shanghai")})
|
|
scheduler.configure({'timezone': timezone("Asia/Shanghai")})
|
|
- scheduler.add_job(func=download_zip_files_from_ftp, trigger="interval", seconds=300)
|
|
|
|
|
|
+ scheduler.add_job(func=download_zip_files_from_ftp, trigger="interval", seconds=900)
|
|
scheduler.start()
|
|
scheduler.start()
|
|
|
|
|
|
def match_date(date, filename):
|
|
def match_date(date, filename):
|
|
@@ -67,6 +67,29 @@ def delete_zip_files(date):
|
|
except OSError as e:
|
|
except OSError as e:
|
|
print(f"Error deleting file {csv_file_path}: {e.strerror}")
|
|
print(f"Error deleting file {csv_file_path}: {e.strerror}")
|
|
|
|
|
|
|
|
+def get_moment_next(schedule_dt=False):
|
|
|
|
+ if schedule_dt:
|
|
|
|
+ now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
|
|
|
|
+ else:
|
|
|
|
+ now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
|
|
|
|
+ if now.hour == 18:
|
|
|
|
+ moment = '18'
|
|
|
|
+ elif now.hour > 18:
|
|
|
|
+ moment = '00'
|
|
|
|
+ elif now.hour == 12:
|
|
|
|
+ moment = '12'
|
|
|
|
+ elif now.hour > 12:
|
|
|
|
+ moment = '18'
|
|
|
|
+ elif now.hour == 6:
|
|
|
|
+ moment = '06'
|
|
|
|
+ elif now.hour > 6:
|
|
|
|
+ moment = '12'
|
|
|
|
+ elif 2 >= now.hour >= 0:
|
|
|
|
+ moment = '00'
|
|
|
|
+ else:
|
|
|
|
+ moment = '06'
|
|
|
|
+ return moment
|
|
|
|
+
|
|
def get_moment(schedule_dt=False):
|
|
def get_moment(schedule_dt=False):
|
|
if schedule_dt:
|
|
if schedule_dt:
|
|
now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
|
|
now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
|
|
@@ -87,7 +110,7 @@ def download_zip_files_from_ftp(moment=None):
|
|
date = now.strftime("%Y%m%d")
|
|
date = now.strftime("%Y%m%d")
|
|
date_2 = (now - timedelta(days=2)).strftime("%Y%m%d")
|
|
date_2 = (now - timedelta(days=2)).strftime("%Y%m%d")
|
|
if moment is None:
|
|
if moment is None:
|
|
- moment = get_moment()
|
|
|
|
|
|
+ moment = get_moment_next()
|
|
host = 'xxl'
|
|
host = 'xxl'
|
|
ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
|
|
ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
|
|
zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
|
|
zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
|
|
@@ -125,7 +148,7 @@ def download_zip_files_from_ftp(moment=None):
|
|
delete_zip_files(date_2)
|
|
delete_zip_files(date_2)
|
|
|
|
|
|
def select_file_to_mongo(args):
|
|
def select_file_to_mongo(args):
|
|
- date, moment, farmId, isDq = args['dt'], get_moment(args['dt']), args['farmId'], args['isDq']
|
|
|
|
|
|
+ date, moment, farmId, isDq = args['dt'], get_moment_next(args['dt']), args['farmId'], args['isDq']
|
|
date = datetime.datetime.strptime(date, '%Y-%m-%d %H:%M:%S').strftime("%Y%m%d")
|
|
date = datetime.datetime.strptime(date, '%Y-%m-%d %H:%M:%S').strftime("%Y%m%d")
|
|
csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
|
|
csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
|
|
csv_file_weather = csv_file_format.replace('*', 'weather')
|
|
csv_file_weather = csv_file_format.replace('*', 'weather')
|
|
@@ -145,9 +168,10 @@ def select_file_to_mongo(args):
|
|
logger.info("找到power:{}".format(csv_power_path))
|
|
logger.info("找到power:{}".format(csv_power_path))
|
|
if csv_weather_path or csv_power_path:
|
|
if csv_weather_path or csv_power_path:
|
|
break
|
|
break
|
|
-
|
|
|
|
|
|
+ if csv_weather_path is False:
|
|
|
|
+ raise ValueError("获取nwp文件异常:找不到文件")
|
|
# 使用 pandas 读取 CSV 文件
|
|
# 使用 pandas 读取 CSV 文件
|
|
- weather = pd.read_csv(csv_weather_path) if csv_weather_path else ValueError("获取nwp文件异常")
|
|
|
|
|
|
+ weather = pd.read_csv(csv_weather_path)
|
|
power = pd.read_csv(csv_power_path) if csv_power_path else None
|
|
power = pd.read_csv(csv_power_path) if csv_power_path else None
|
|
if isDq:
|
|
if isDq:
|
|
if csv_weather_path and csv_power_path:
|
|
if csv_weather_path and csv_power_path:
|