David 4 mesi fa
parent
commit
8b44d20bfc
1 ha cambiato i file con 21 aggiunte e 19 eliminazioni
  1. 21 19
      data_processing/data_operation/data_nwp_ftp.py

+ 21 - 19
data_processing/data_operation/data_nwp_ftp.py

@@ -92,28 +92,29 @@ def download_zip_files_from_ftp(moment=None):
     ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
     zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
     zip_file_path = []
-    # 连接到FTP服务器
-    with FTP(ftp_host) as ftp:
-        ftp.login(user=ftp_user, passwd=ftp_password)
+    if len(glob.glob(os.path.join(local_dir, zip_extension))) == 0:
+        # 连接到FTP服务器
+        with FTP(ftp_host) as ftp:
+            ftp.login(user=ftp_user, passwd=ftp_password)
 
-        # 切换到远程目录
-        ftp.cwd(remote_dir)
+            # 切换到远程目录
+            ftp.cwd(remote_dir)
 
-        # 获取远程目录中的文件和目录列表
-        files = ftp.nlst()
+            # 获取远程目录中的文件和目录列表
+            files = ftp.nlst()
 
-        # 遍历文件列表,找到ZIP文件并下载
-        for file_name in files:
-            if fnmatch.fnmatch(file_name, zip_extension):
-                remote_file_path = os.path.join(remote_dir, file_name)
-                local_file_path = os.path.join(local_dir, file_name)
+            # 遍历文件列表,找到ZIP文件并下载
+            for file_name in files:
+                if fnmatch.fnmatch(file_name, zip_extension):
+                    remote_file_path = os.path.join(remote_dir, file_name)
+                    local_file_path = os.path.join(local_dir, file_name)
 
-                with open(local_file_path, 'wb') as local_file:
-                    logger.info(f"Downloading {remote_file_path} to {local_file_path}")
-                    ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
+                    with open(local_file_path, 'wb') as local_file:
+                        logger.info(f"Downloading {remote_file_path} to {local_file_path}")
+                        ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
 
-                logger.info(f"Downloaded {file_name}")
-                zip_file_path.append(local_file_path)
+                    logger.info(f"Downloaded {file_name}")
+                    zip_file_path.append(local_file_path)
     # 解压 ZIP 文件到临时目录
     for zip_file_p in zip_file_path:
         with zipfile.ZipFile(zip_file_p, 'r') as zip_ref:
@@ -136,14 +137,15 @@ def select_file_to_mongo(args):
                 csv_file_path = os.path.join(target_dir_path, file_name)
                 if fnmatch.fnmatch(file_name, csv_file_weather):
                     csv_weather_path = csv_file_path
-                    logger.info("找到nwp:{}".format(csv_power_path))
+                    logger.info("找到nwp:{}".format(csv_weather_path))
                 if fnmatch.fnmatch(file_name, csv_file_power):
                     csv_power_path = csv_file_path
+                    logger.info("找到power:{}".format(csv_power_path))
             if csv_weather_path or csv_power_path:
                 break
 
     # 使用 pandas 读取 CSV 文件
-    weather = pd.read_csv(csv_weather_path) if csv_power_path else None
+    weather = pd.read_csv(csv_weather_path) if csv_weather_path else ValueError("获取nwp文件异常")
     power = pd.read_csv(csv_power_path) if csv_power_path else None
     if isDq:
         if csv_weather_path and csv_power_path: