David 4 ماه پیش
والد
کامیت
abaa7301cc
2فایلهای تغییر یافته به همراه37 افزوده شده و 23 حذف شده
  1. 20 22
      data_processing/data_operation/data_nwp_ftp.py
  2. 17 1
      data_processing/data_operation/pre_data_ftp.py

+ 20 - 22
data_processing/data_operation/data_nwp_ftp.py

@@ -67,19 +67,27 @@ def delete_zip_files(date):
                     except OSError as e:
                         print(f"Error deleting file {csv_file_path}: {e.strerror}")
 
+def get_moment(schedule_dt=False):
+    if schedule_dt:
+        now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
+    else:
+        now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
+    if now.hour >= 18:
+        moment = '18'
+    elif now.hour >= 12:
+        moment = '12'
+    elif now.hour >= 6:
+        moment = '06'
+    else:
+        moment = '00'
+    return moment
+
 def download_zip_files_from_ftp(moment=None):
     now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
     date = now.strftime("%Y%m%d")
     date_2 = (now - timedelta(days=2)).strftime("%Y%m%d")
     if moment is None:
-        if now.hour >= 18:
-            moment = '18'
-        elif now.hour >= 12:
-            moment = '12'
-        elif now.hour >= 6:
-            moment = '06'
-        else:
-            moment = '00'
+        moment = get_moment()
     host = 'xxl'
     ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
     zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
@@ -114,8 +122,8 @@ def download_zip_files_from_ftp(moment=None):
     delete_zip_files(date_2)
 
 def select_file_to_mongo(args):
-    date, moment, farmId, isDq = args['date'], args['moment'], args['farmId'], args['isDq']
-    date = datetime.datetime.strptime(args['date'], '%Y-%m-%d 00:00:00').strftime("%Y%m%d")
+    date, moment, farmId, isDq = args['dt'], get_moment(args['dt']), args['farmId'], args['isDq']
+    date = datetime.datetime.strptime(date, '%Y-%m-%d %H:%M:%S').strftime("%Y%m%d")
     csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
     csv_file_weather = csv_file_format.replace('*', 'weather')
     csv_file_power = csv_file_format.replace('*', 'power')
@@ -158,7 +166,7 @@ def select_file_to_mongo(args):
 
 
 def select_dx_from_nwp(df, args):
-    date = datetime.datetime.strptime(args['date'], '%Y-%m-%d 00:00:00')
+    date = datetime.datetime.strptime(args['dt'], '%Y-%m-%d %H:%M:%S')
     date_begin = date + pd.Timedelta(days=int(args.get('day_begin', 'D0')[1:]))
     date_end = date + pd.Timedelta(days=int(args.get('day_end', 'D13')[1:]))
     df['date_time'] = df['date_time'].str.replace("_", " ")
@@ -216,22 +224,12 @@ if __name__ == "__main__":
     print("Program starts execution!")
     from waitress import serve
     update_thread() #定时任务开启
-    # current_time = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
-    # current_hour = current_time.hour
-    # if current_hour >= 18:
-    #     current_hour = '18'
-    # elif current_hour >= 12:
-    #     current_hour = '12'
-    # elif current_hour >= 6:
-    #     current_hour = '06'
-    # else:
-    #     current_hour = '00'
     current_hour = '06' # 默认首次运行下载06时刻的zip
     threading.Thread(target=download_zip_files_from_ftp, kwargs={'moment': current_hour}).start()
     serve(app, host="0.0.0.0", port=10102)
     print("server start!")
 
-    # args = {"source": 'xxl', "date": '2024-12-27 00:00:00', 'moment': '06', 'farmId': 'J00645',
+    # args = {"source": 'xxl', "date": '2024-12-27', 'moment': '06', 'farmId': 'J00645',
     # 'mongodb_database': 'db2', 'mongodb_write_table': 'j00645-d1', 'day_begin':'D0',
     #         'day_end': 'D13', 'isDq': True}
     # download_zip_files_from_ftp(hour='06')

+ 17 - 1
data_processing/data_operation/pre_data_ftp.py

@@ -33,6 +33,21 @@ ftp_params = {
     }
 }
 
+def get_moment(schedule_dt=False):
+    if schedule_dt:
+        now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
+    else:
+        now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
+    if now.hour >= 18:
+        moment = '18'
+    elif now.hour >= 12:
+        moment = '12'
+    elif now.hour >= 6:
+        moment = '06'
+    else:
+        moment = '00'
+    return moment
+
 def zip_temp_file(df, args):
     def zip_folder(folder_path, zip_filePath):
         zip_file = zipfile.ZipFile(zip_filePath, ftp_params['zip_mode'], zipfile.ZIP_DEFLATED)
@@ -44,7 +59,8 @@ def zip_temp_file(df, args):
     temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
     current_time = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
     dt = current_time.strftime('%Y%m%d')
-    modeler, model, version, farmId, moment = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId'], args['moment']
+    moment = get_moment() if args.get('dt') is None else get_moment(args.get('dt'))
+    modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
     csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, dt, moment)
     csv_path = os.path.join(temp_dir, farmId, csv_file)
     os.makedirs(os.path.dirname(csv_path), exist_ok=True)