David 5 days ago
parent
commit
9fbb171e93
1 changed files with 80 additions and 11 deletions
  1. 80 11
      data_processing/data_operation/data_nwp_ftp.py

+ 80 - 11
data_processing/data_operation/data_nwp_ftp.py

@@ -16,6 +16,8 @@ import zipfile, tempfile, shutil, fnmatch
 from common.database_dml import insert_data_into_mongo
 from apscheduler.schedulers.background import BackgroundScheduler
 from apscheduler.triggers.cron import CronTrigger
+from ftplib import error_temp, error_perm
+from socket import error as socket_error
 
 from common.logs import Log
 logger = Log('data-processing').logger
@@ -117,6 +119,72 @@ def get_previous_moment(original_date, original_moment):
 
     return new_date, new_moment
 
+
+def safe_ftp_download(ftp, remote_file_path, local_file_path, max_retries=3):
+    file_name = os.path.basename(local_file_path)
+    attempt = 0
+
+    while attempt < max_retries:
+        try:
+            # 初始化下载参数
+            ftp.pwd()
+            ftp.sendcmd("NOOP")  # 保持连接活跃
+            ftp.voidcmd("TYPE I")  # 确保二进制模式
+
+            # 记录开始时间
+            start = time.time()
+            logger.info(f"开始第 {attempt + 1} 次下载尝试: {remote_file_path}")
+
+            # 使用上下文管理器确保文件关闭
+            with open(local_file_path, 'wb') as local_file:
+                # 设置超时和被动模式
+                ftp.timeout = 3000
+                ftp.set_pasv(True)
+
+                # 带进度回调的下载
+                def _callback(data):
+                    local_file.write(data)
+                    logger.debug(f"已接收 {len(data)} 字节")
+
+                ftp.retrbinary(f'RETR {remote_file_path}', _callback)
+
+            # 验证文件完整性
+            remote_size = ftp.size(remote_file_path)
+            local_size = os.path.getsize(local_file_path)
+            if local_size != remote_size:
+                raise IOError(f"文件大小不匹配: 本地 {local_size} vs 远程 {remote_size}")
+
+            # 记录成功日志
+            end = time.time()
+            now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
+            logger_text = f"""下载成功!时间:{now.strftime('%Y-%m-%d %H:%M:%S')}
+                文件:{file_name}
+                耗时:{(end - start) / 60:.2f}分钟
+                平均速度:{(remote_size / 1024 / 1024) / (end - start):.2f}MB/s"""
+            logger.info(logger_text)
+            send_message(file_name, logger_text)
+            return True
+
+        except (error_temp, error_perm, socket_error, IOError) as e:
+            logger.error(f"第 {attempt + 1} 次下载失败: {str(e)}")
+            # 删除不完整文件
+            if os.path.exists(local_file_path):
+                try:
+                    os.remove(local_file_path)
+                    logger.warning(f"已删除不完整文件: {local_file_path}")
+                except Exception as clean_error:
+                    logger.error(f"文件清理失败: {str(clean_error)}")
+            attempt += 1
+            time.sleep(5)  # 重试间隔
+
+        except Exception as unexpected_error:
+            logger.critical(f"未知错误: {str(unexpected_error)}")
+            raise
+    logger_text = f"下载失败: 已达最大重试次数 {max_retries}"
+    logger.error(logger_text)
+    send_message(file_name, logger_text)
+    return False
+
 def download_zip_files_from_ftp(moment=None):
     now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
     date = now.strftime("%Y%m%d")
@@ -140,22 +208,23 @@ def download_zip_files_from_ftp(moment=None):
         # 遍历文件列表,找到ZIP文件并下载
         for file_name in files:
             if fnmatch.fnmatch(file_name, zip_extension):
-                start = time.time()
+                # start = time.time()
                 remote_file_path = os.path.join(remote_dir, file_name)
                 local_file_path = os.path.join(local_dir, file_name)
 
                 if os.path.isfile(local_file_path):
                     continue
-
-                with open(local_file_path, 'wb') as local_file:
-                    logger.info(f"Downloading {remote_file_path} to {local_file_path}")
-                    ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
-                end = time.time()
-                now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
-                logger_text = f"下载完成时间:{now.strftime('%Y-%m-%d %H:%M:%S')},下载 {file_name} 文件,用时 {(end - start)/60:.2f}分钟"
-                logger.info(logger_text)
-                send_message(file_name, logger_text)
-                zip_file_path.append(local_file_path)
+                if safe_ftp_download(ftp, remote_file_path, local_file_path):
+                # with open(local_file_path, 'wb') as local_file:
+                #     logger.info(f"Downloading {remote_file_path} to {local_file_path}")
+                #     ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
+
+                # end = time.time()
+                # now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
+                # logger_text = f"下载完成时间:{now.strftime('%Y-%m-%d %H:%M:%S')},下载 {file_name} 文件,用时 {(end - start)/60:.2f}分钟"
+                # logger.info(logger_text)
+                # send_message(file_name, logger_text)
+                    zip_file_path.append(local_file_path)
     # 解压 ZIP 文件到临时目录
     for zip_file_p in zip_file_path:
         with zipfile.ZipFile(zip_file_p, 'r') as zip_ref: