|
@@ -16,6 +16,8 @@ import zipfile, tempfile, shutil, fnmatch
|
|
|
from common.database_dml import insert_data_into_mongo
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
+from ftplib import error_temp, error_perm
|
|
|
+from socket import error as socket_error
|
|
|
|
|
|
from common.logs import Log
|
|
|
logger = Log('data-processing').logger
|
|
@@ -117,6 +119,72 @@ def get_previous_moment(original_date, original_moment):
|
|
|
|
|
|
return new_date, new_moment
|
|
|
|
|
|
+
|
|
|
+def safe_ftp_download(ftp, remote_file_path, local_file_path, max_retries=3):
|
|
|
+ file_name = os.path.basename(local_file_path)
|
|
|
+ attempt = 0
|
|
|
+
|
|
|
+ while attempt < max_retries:
|
|
|
+ try:
|
|
|
+ # 初始化下载参数
|
|
|
+ ftp.pwd()
|
|
|
+ ftp.sendcmd("NOOP") # 保持连接活跃
|
|
|
+ ftp.voidcmd("TYPE I") # 确保二进制模式
|
|
|
+
|
|
|
+ # 记录开始时间
|
|
|
+ start = time.time()
|
|
|
+ logger.info(f"开始第 {attempt + 1} 次下载尝试: {remote_file_path}")
|
|
|
+
|
|
|
+ # 使用上下文管理器确保文件关闭
|
|
|
+ with open(local_file_path, 'wb') as local_file:
|
|
|
+ # 设置超时和被动模式
|
|
|
+ ftp.timeout = 3000
|
|
|
+ ftp.set_pasv(True)
|
|
|
+
|
|
|
+ # 带进度回调的下载
|
|
|
+ def _callback(data):
|
|
|
+ local_file.write(data)
|
|
|
+ logger.debug(f"已接收 {len(data)} 字节")
|
|
|
+
|
|
|
+ ftp.retrbinary(f'RETR {remote_file_path}', _callback)
|
|
|
+
|
|
|
+ # 验证文件完整性
|
|
|
+ remote_size = ftp.size(remote_file_path)
|
|
|
+ local_size = os.path.getsize(local_file_path)
|
|
|
+ if local_size != remote_size:
|
|
|
+ raise IOError(f"文件大小不匹配: 本地 {local_size} vs 远程 {remote_size}")
|
|
|
+
|
|
|
+ # 记录成功日志
|
|
|
+ end = time.time()
|
|
|
+ now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
|
|
|
+ logger_text = f"""下载成功!时间:{now.strftime('%Y-%m-%d %H:%M:%S')}
|
|
|
+ 文件:{file_name}
|
|
|
+ 耗时:{(end - start) / 60:.2f}分钟
|
|
|
+ 平均速度:{(remote_size / 1024 / 1024) / (end - start):.2f}MB/s"""
|
|
|
+ logger.info(logger_text)
|
|
|
+ send_message(file_name, logger_text)
|
|
|
+ return True
|
|
|
+
|
|
|
+ except (error_temp, error_perm, socket_error, IOError) as e:
|
|
|
+ logger.error(f"第 {attempt + 1} 次下载失败: {str(e)}")
|
|
|
+ # 删除不完整文件
|
|
|
+ if os.path.exists(local_file_path):
|
|
|
+ try:
|
|
|
+ os.remove(local_file_path)
|
|
|
+ logger.warning(f"已删除不完整文件: {local_file_path}")
|
|
|
+ except Exception as clean_error:
|
|
|
+ logger.error(f"文件清理失败: {str(clean_error)}")
|
|
|
+ attempt += 1
|
|
|
+ time.sleep(5) # 重试间隔
|
|
|
+
|
|
|
+ except Exception as unexpected_error:
|
|
|
+ logger.critical(f"未知错误: {str(unexpected_error)}")
|
|
|
+ raise
|
|
|
+ logger_text = f"下载失败: 已达最大重试次数 {max_retries}"
|
|
|
+ logger.error(logger_text)
|
|
|
+ send_message(file_name, logger_text)
|
|
|
+ return False
|
|
|
+
|
|
|
def download_zip_files_from_ftp(moment=None):
|
|
|
now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
|
|
|
date = now.strftime("%Y%m%d")
|
|
@@ -140,22 +208,23 @@ def download_zip_files_from_ftp(moment=None):
|
|
|
# 遍历文件列表,找到ZIP文件并下载
|
|
|
for file_name in files:
|
|
|
if fnmatch.fnmatch(file_name, zip_extension):
|
|
|
- start = time.time()
|
|
|
+ # start = time.time()
|
|
|
remote_file_path = os.path.join(remote_dir, file_name)
|
|
|
local_file_path = os.path.join(local_dir, file_name)
|
|
|
|
|
|
if os.path.isfile(local_file_path):
|
|
|
continue
|
|
|
-
|
|
|
- with open(local_file_path, 'wb') as local_file:
|
|
|
- logger.info(f"Downloading {remote_file_path} to {local_file_path}")
|
|
|
- ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
|
|
|
- end = time.time()
|
|
|
- now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
|
|
|
- logger_text = f"下载完成时间:{now.strftime('%Y-%m-%d %H:%M:%S')},下载 {file_name} 文件,用时 {(end - start)/60:.2f}分钟"
|
|
|
- logger.info(logger_text)
|
|
|
- send_message(file_name, logger_text)
|
|
|
- zip_file_path.append(local_file_path)
|
|
|
+ if safe_ftp_download(ftp, remote_file_path, local_file_path):
|
|
|
+ # with open(local_file_path, 'wb') as local_file:
|
|
|
+ # logger.info(f"Downloading {remote_file_path} to {local_file_path}")
|
|
|
+ # ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
|
|
|
+
|
|
|
+ # end = time.time()
|
|
|
+ # now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
|
|
|
+ # logger_text = f"下载完成时间:{now.strftime('%Y-%m-%d %H:%M:%S')},下载 {file_name} 文件,用时 {(end - start)/60:.2f}分钟"
|
|
|
+ # logger.info(logger_text)
|
|
|
+ # send_message(file_name, logger_text)
|
|
|
+ zip_file_path.append(local_file_path)
|
|
|
# 解压 ZIP 文件到临时目录
|
|
|
for zip_file_p in zip_file_path:
|
|
|
with zipfile.ZipFile(zip_file_p, 'r') as zip_ref:
|