|
@@ -67,8 +67,8 @@ def delete_zip_files(date):
|
|
except OSError as e:
|
|
except OSError as e:
|
|
print(f"Error deleting file {csv_file_path}: {e.strerror}")
|
|
print(f"Error deleting file {csv_file_path}: {e.strerror}")
|
|
|
|
|
|
-def get_moment_next(schedule_dt=False):
|
|
|
|
- if schedule_dt:
|
|
|
|
|
|
+def get_moment_next(schedule_dt=None):
|
|
|
|
+ if schedule_dt is not None:
|
|
now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
|
|
now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
|
|
else:
|
|
else:
|
|
now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
|
|
now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
|
|
@@ -91,6 +91,23 @@ def get_moment_next(schedule_dt=False):
|
|
moment = '06'
|
|
moment = '06'
|
|
return date, moment
|
|
return date, moment
|
|
|
|
|
|
|
|
+def get_previous_moment(original_date, original_moment):
|
|
|
|
+ # 将原date和moment转换为上海时区的datetime对象
|
|
|
|
+ tz_shanghai = timezone('Asia/Shanghai')
|
|
|
|
+ original_dt = datetime.datetime.strptime(f"{original_date}{original_moment}", "%Y%m%d%H")
|
|
|
|
+ original_dt = tz_shanghai.localize(original_dt)
|
|
|
|
+
|
|
|
|
+ # 减去6小时
|
|
|
|
+ new_dt = original_dt - datetime.timedelta(hours=6)
|
|
|
|
+
|
|
|
|
+ # 转换为原方法可接受的schedule_dt字符串格式
|
|
|
|
+ schedule_dt_str = new_dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
+
|
|
|
|
+ # 调用原方法获取新的date和moment
|
|
|
|
+ new_date, new_moment = get_moment_next(schedule_dt=schedule_dt_str)
|
|
|
|
+
|
|
|
|
+ return new_date, new_moment
|
|
|
|
+
|
|
def download_zip_files_from_ftp(moment=None):
|
|
def download_zip_files_from_ftp(moment=None):
|
|
now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
|
|
now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
|
|
date = now.strftime("%Y%m%d")
|
|
date = now.strftime("%Y%m%d")
|
|
@@ -133,8 +150,7 @@ def download_zip_files_from_ftp(moment=None):
|
|
# 删除前天之前所有 ZIP 文件
|
|
# 删除前天之前所有 ZIP 文件
|
|
delete_zip_files(date_2)
|
|
delete_zip_files(date_2)
|
|
|
|
|
|
-def select_file_to_mongo(args):
|
|
|
|
- date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
|
|
|
|
|
|
+def select_file_to_mongo(args, date, moment):
|
|
farmId, isDq = args['farmId'], args['isDq']
|
|
farmId, isDq = args['farmId'], args['isDq']
|
|
csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
|
|
csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
|
|
csv_file_weather = csv_file_format.replace('*', 'weather')
|
|
csv_file_weather = csv_file_format.replace('*', 'weather')
|
|
@@ -148,14 +164,15 @@ def select_file_to_mongo(args):
|
|
csv_file_path = os.path.join(target_dir_path, file_name)
|
|
csv_file_path = os.path.join(target_dir_path, file_name)
|
|
if fnmatch.fnmatch(file_name, csv_file_weather):
|
|
if fnmatch.fnmatch(file_name, csv_file_weather):
|
|
csv_weather_path = csv_file_path
|
|
csv_weather_path = csv_file_path
|
|
- logger.info("找到nwp:{}".format(csv_weather_path))
|
|
|
|
|
|
+ logger.info("***找到nwp:{}***".format(csv_weather_path))
|
|
if fnmatch.fnmatch(file_name, csv_file_power):
|
|
if fnmatch.fnmatch(file_name, csv_file_power):
|
|
csv_power_path = csv_file_path
|
|
csv_power_path = csv_file_path
|
|
- logger.info("找到power:{}".format(csv_power_path))
|
|
|
|
|
|
+ logger.info("***找到power:{}***".format(csv_power_path))
|
|
if csv_weather_path or csv_power_path:
|
|
if csv_weather_path or csv_power_path:
|
|
break
|
|
break
|
|
if csv_weather_path is False:
|
|
if csv_weather_path is False:
|
|
- raise ValueError("获取nwp文件异常:找不到场站 {} nwp文件".format(farmId))
|
|
|
|
|
|
+ logger.info("获取nwp文件异常:找不到场站 {} nwp文件".format(farmId))
|
|
|
|
+ return False
|
|
# 使用 pandas 读取 CSV 文件
|
|
# 使用 pandas 读取 CSV 文件
|
|
weather = pd.read_csv(csv_weather_path)
|
|
weather = pd.read_csv(csv_weather_path)
|
|
power = pd.read_csv(csv_power_path) if csv_power_path else None
|
|
power = pd.read_csv(csv_power_path) if csv_power_path else None
|
|
@@ -178,10 +195,11 @@ def select_file_to_mongo(args):
|
|
insert_data_into_mongo(df, args)
|
|
insert_data_into_mongo(df, args)
|
|
else:
|
|
else:
|
|
logger.info(f"CSV 文件 {csv_file_weather} 在目标目录 {farmId} 中未找到")
|
|
logger.info(f"CSV 文件 {csv_file_weather} 在目标目录 {farmId} 中未找到")
|
|
|
|
+ return True
|
|
|
|
|
|
|
|
|
|
def select_dx_from_nwp(df, args):
|
|
def select_dx_from_nwp(df, args):
|
|
- date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
|
|
|
|
|
|
+ date, moment = get_moment_next(args.get('dt'))
|
|
date = datetime.datetime.strptime(date, "%Y%m%d")
|
|
date = datetime.datetime.strptime(date, "%Y%m%d")
|
|
date_begin = date + pd.Timedelta(days=int(args.get('day_begin', 'D0')[1:]))
|
|
date_begin = date + pd.Timedelta(days=int(args.get('day_begin', 'D0')[1:]))
|
|
date_end = date + pd.Timedelta(days=int(args.get('day_end', 'D13')[1:]))
|
|
date_end = date + pd.Timedelta(days=int(args.get('day_end', 'D13')[1:]))
|
|
@@ -210,7 +228,7 @@ def get_nwp_from_ftp():
|
|
# 获取程序开始时间
|
|
# 获取程序开始时间
|
|
start_time = time.time()
|
|
start_time = time.time()
|
|
result = {}
|
|
result = {}
|
|
- success = 0
|
|
|
|
|
|
+ success = False
|
|
args = {}
|
|
args = {}
|
|
# print("data_nwp_ftp starts execution!")
|
|
# print("data_nwp_ftp starts execution!")
|
|
try:
|
|
try:
|
|
@@ -220,8 +238,13 @@ def get_nwp_from_ftp():
|
|
logger.info(args)
|
|
logger.info(args)
|
|
# 2. 连接FTP,从FTP服务器中获取指定参数的压缩文件(定时任务)
|
|
# 2. 连接FTP,从FTP服务器中获取指定参数的压缩文件(定时任务)
|
|
# 3. 解压压缩文件,将其存储到mongo中
|
|
# 3. 解压压缩文件,将其存储到mongo中
|
|
- select_file_to_mongo(args)
|
|
|
|
- success = 1
|
|
|
|
|
|
+ date, moment = get_moment_next(args.get('dt'))
|
|
|
|
+ logger.info("从日期{}时刻{}开始查找文件".format(date, moment))
|
|
|
|
+ success = select_file_to_mongo(args, date, moment)
|
|
|
|
+ while not success:
|
|
|
|
+ date, moment = get_previous_moment(date, moment)
|
|
|
|
+ logger.info("未找到,从日期{}时刻{}开始查找文件".format(date, moment))
|
|
|
|
+ success = select_file_to_mongo(args, date, moment)
|
|
except Exception as e:
|
|
except Exception as e:
|
|
my_exception = traceback.format_exc()
|
|
my_exception = traceback.format_exc()
|
|
my_exception.replace("\n", "\t")
|
|
my_exception.replace("\n", "\t")
|
|
@@ -240,8 +263,10 @@ if __name__ == "__main__":
|
|
print("Program starts execution!")
|
|
print("Program starts execution!")
|
|
from waitress import serve
|
|
from waitress import serve
|
|
update_thread() #定时任务开启
|
|
update_thread() #定时任务开启
|
|
- current_hour = '06' # 默认首次运行下载06时刻的zip
|
|
|
|
- threading.Thread(target=download_zip_files_from_ftp, kwargs={'moment': current_hour}).start()
|
|
|
|
|
|
+ # 遍历参数,创建并启动线程
|
|
|
|
+ for moment in ['00', '06', '12', '18']:
|
|
|
|
+ threading.Thread(target=download_zip_files_from_ftp, kwargs={'moment': moment}).start()
|
|
|
|
+
|
|
serve(app, host="0.0.0.0", port=10102)
|
|
serve(app, host="0.0.0.0", port=10102)
|
|
print("server start!")
|
|
print("server start!")
|
|
|
|
|