#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/3/2 10:28 # file: config.py # author: David # company: shenyang JY """ 模型调参及系统功能配置 """ from tqdm import tqdm import pandas as pd from pathlib import Path from functools import partial from concurrent.futures import ProcessPoolExecutor from app.common.config import parser, logger from app.model.resource_manager import ResourceController from app.model.task_worker import station_task """" 调用思路 xxxx 1. 从入口参数中获取IN OUT文件位置 xxxx 2. 按照训练和预测加载和解析数据 3. 对数据进行预处理 4. 执行训练,保存模型,输出状态 5. 执行预测,输出结果,输出状态 """ """ 训练任务 1.将一个省份下的所有场站加入队列 2.队列中的每个场站是一个子任务,还有最终的区域级子任务 """ def main(): # ---------------------------- 解析参数 ---------------------------- # 解析参数,将固定参数和任务参数合并 opt = parser.parse_args_and_yaml() config = opt.__dict__ # 打印参数 logger.info(f"输入文件目录: {opt.input_file}") # ---------------------------- 配置计算资源和任务 ---------------------------- # 初始化资源管理器 rc = ResourceController( max_workers=opt.system['max_workers'], gpu_list=opt.system['gpu_devices'] ) # 生成任务列表 all_stations = [str(child) for child in Path(opt.input_file).iterdir() if child.is_dir()] # task_func = partial(station_task, config=config) # ---------------------------- 监控任务 ---------------------------- # 进度跟踪 completed = 0 with tqdm(total=len(all_stations)) as pbar: with ProcessPoolExecutor(max_workers=rc.cpu_cores) as executor: futures = [] for sid in all_stations: # 动态分配GPU gpu_id = rc.get_gpu() task_config = config.copy() task_config['gpu_assignment'] = gpu_id # 提交任务 future = executor.submit(station_task, sid, task_config) future.add_done_callback( lambda _: rc.release_gpu(task_config['gpu_assignment'])) futures.append(future) # 处理完成情况 for future in futures: result = future._result if result == 'success': completed += 1 pbar.update(1) pbar.set_postfix_str(f"Completed: {completed}/{len(all_stations)}") print(f"Final result: {completed} stations trained successfully") if __name__ == "__main__": main()