retuve.batch
This module contains the functions to run the retuve pipeline on a batch of files. It also has the functions used to make the CLI commands for running retuve on a single file or a batch of files.
1# Copyright 2024 Adam McArthur 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16This module contains the functions to run the retuve pipeline on a batch of 17files. It also has the functions used to make the CLI commands for running retuve 18on a single file or a batch of files. 19""" 20 21import json 22import multiprocessing 23import os 24import shutil 25import time 26import traceback 27 28import torch 29 30from retuve.funcs import retuve_run 31from retuve.keyphrases.config import Config 32from retuve.keyphrases.enums import Outputs 33from retuve.logs import ulogger 34 35 36def run_single( 37 config: Config, 38 file_name: str, 39 for_batch: bool = False, 40 local_savedir: str = None, 41): 42 """ 43 Run the retuve pipeline on a single file. 44 45 :param config: The configuration. 46 :param file_name: The file name. 47 :param for_batch: Whether the file is being processed in a batch. 48 This changes the way the file is saved. 49 :param local_savedir: The local save directory. 50 51 :return: The error message if any. 52 """ 53 54 fileid = file_name.split("/")[-1].split(".")[0] 55 if local_savedir: 56 savedir = local_savedir 57 else: 58 savedir = config.api.savedir 59 60 hip_mode = config.batch.hip_mode 61 62 if for_batch: 63 fileid += "/" 64 65 # if the metrics.json already exist, skip 66 if os.path.exists(f"{savedir}/{fileid}metrics.json"): 67 return f"File {fileid} already processed" 68 69 if os.path.exists(f"{savedir}/{fileid}"): 70 shutil.rmtree(f"{savedir}/{fileid}") 71 72 os.makedirs(f"{savedir}/{fileid}") 73 else: 74 fileid += "_" 75 76 try: 77 retuve_result = retuve_run( 78 hip_mode=hip_mode, 79 config=config, 80 modes_func=config.batch.mode_func, 81 modes_func_kwargs_dict=config.batch.mode_func_args, 82 file=file_name, 83 ) 84 hip_datas = retuve_result.hip_datas 85 86 if hip_datas and hip_datas.recorded_error: 87 ulogger.info( 88 f"\n Recorded Error: {hip_datas.recorded_error} " 89 f"Critical: {hip_datas.recorded_error.critical}" 90 ) 91 92 if retuve_result.image is not None: 93 retuve_result.image.save(f"{savedir}/{fileid}{Outputs.IMAGE}") 94 95 if retuve_result.metrics and retuve_result.metrics.get("dev_metrics"): 96 ulogger.info( 97 "\n Dev Metrics: ", retuve_result.metrics["dev_metrics"] 98 ) 99 100 if retuve_result.video_clip is not None: 101 retuve_result.video_clip.write_videofile( 102 f"{savedir}/{fileid}{Outputs.VIDEO_CLIP}", 103 ) 104 105 if retuve_result.visual_3d is not None: 106 retuve_result.visual_3d.write_html( 107 f"{savedir}/{fileid}{Outputs.VISUAL3D}" 108 ) 109 110 # save the metrics to a file 111 with open(f"{savedir}/{fileid}{Outputs.METRICS}", "w") as f: 112 f.write(json.dumps(retuve_result.metrics)) 113 114 except Exception as e: 115 e = traceback.format_exc() 116 ulogger.error(f"Error processing file {file_name}: {e}") 117 return e 118 119 120def run_batch(config: Config): 121 """ 122 Run the retuve pipeline on a batch of files. 123 124 :param config: The configuration. 125 """ 126 all_files = [] 127 128 for dataset in config.batch.datasets: 129 files = os.listdir(dataset) 130 131 files = [ 132 f"{dataset}/{file}" 133 for file in files 134 if any( 135 file.endswith(input_type) 136 for input_type in config.batch.input_types 137 ) 138 ] 139 140 all_files.extend(files) 141 142 start = time.time() 143 144 # create savedir if it doesn't exist 145 if not os.path.exists(config.api.savedir): 146 os.makedirs(config.api.savedir) 147 148 torch.multiprocessing.set_start_method("spawn", force=True) 149 150 with multiprocessing.Pool(processes=config.batch.processes) as pool: 151 chunks = [(config, file, True) for file in all_files] 152 errors = pool.starmap(run_single, chunks) 153 154 if any(error is not None for error in errors): 155 already_processed = sum( 156 "already processed" in error 157 for error in errors 158 if error is not None 159 ) 160 # count and remove all errors containing "already processed" 161 errors = [ 162 error 163 for error in errors 164 if error is not None and "already processed" not in error 165 ] 166 167 for error in errors: 168 ulogger.info(error) 169 170 ulogger.info(f"Errors: {len(errors)}") 171 ulogger.info(f"Already processed: {already_processed}") 172 173 end = time.time() 174 175 if len(all_files) == 0: 176 ulogger.info( 177 f"No files with types in {config.batch.input_types} " 178 "found in the directory" 179 ) 180 return 181 182 # convert to minutes and seconds 183 minutes, seconds = divmod(end - start, 60) 184 ulogger.info(f"Time taken: {minutes:.0f}m {seconds:.0f}s") 185 186 # Half to ignore the .nii files 187 no_of_files = len(all_files) // 2 188 189 if no_of_files == 0: 190 no_of_files = 1 191 192 # Print average time per file 193 ulogger.info(f"Average time per file: {(end - start) / no_of_files:.2f}s") 194 195 # Print number of files 196 ulogger.info(f"Number of files: {no_of_files}")
def
run_single( config: retuve.keyphrases.config.Config, file_name: str, for_batch: bool = False, local_savedir: str = None):
37def run_single( 38 config: Config, 39 file_name: str, 40 for_batch: bool = False, 41 local_savedir: str = None, 42): 43 """ 44 Run the retuve pipeline on a single file. 45 46 :param config: The configuration. 47 :param file_name: The file name. 48 :param for_batch: Whether the file is being processed in a batch. 49 This changes the way the file is saved. 50 :param local_savedir: The local save directory. 51 52 :return: The error message if any. 53 """ 54 55 fileid = file_name.split("/")[-1].split(".")[0] 56 if local_savedir: 57 savedir = local_savedir 58 else: 59 savedir = config.api.savedir 60 61 hip_mode = config.batch.hip_mode 62 63 if for_batch: 64 fileid += "/" 65 66 # if the metrics.json already exist, skip 67 if os.path.exists(f"{savedir}/{fileid}metrics.json"): 68 return f"File {fileid} already processed" 69 70 if os.path.exists(f"{savedir}/{fileid}"): 71 shutil.rmtree(f"{savedir}/{fileid}") 72 73 os.makedirs(f"{savedir}/{fileid}") 74 else: 75 fileid += "_" 76 77 try: 78 retuve_result = retuve_run( 79 hip_mode=hip_mode, 80 config=config, 81 modes_func=config.batch.mode_func, 82 modes_func_kwargs_dict=config.batch.mode_func_args, 83 file=file_name, 84 ) 85 hip_datas = retuve_result.hip_datas 86 87 if hip_datas and hip_datas.recorded_error: 88 ulogger.info( 89 f"\n Recorded Error: {hip_datas.recorded_error} " 90 f"Critical: {hip_datas.recorded_error.critical}" 91 ) 92 93 if retuve_result.image is not None: 94 retuve_result.image.save(f"{savedir}/{fileid}{Outputs.IMAGE}") 95 96 if retuve_result.metrics and retuve_result.metrics.get("dev_metrics"): 97 ulogger.info( 98 "\n Dev Metrics: ", retuve_result.metrics["dev_metrics"] 99 ) 100 101 if retuve_result.video_clip is not None: 102 retuve_result.video_clip.write_videofile( 103 f"{savedir}/{fileid}{Outputs.VIDEO_CLIP}", 104 ) 105 106 if retuve_result.visual_3d is not None: 107 retuve_result.visual_3d.write_html( 108 f"{savedir}/{fileid}{Outputs.VISUAL3D}" 109 ) 110 111 # save the metrics to a file 112 with open(f"{savedir}/{fileid}{Outputs.METRICS}", "w") as f: 113 f.write(json.dumps(retuve_result.metrics)) 114 115 except Exception as e: 116 e = traceback.format_exc() 117 ulogger.error(f"Error processing file {file_name}: {e}") 118 return e
Run the retuve pipeline on a single file.
Parameters
- config: The configuration.
- file_name: The file name.
- for_batch: Whether the file is being processed in a batch. This changes the way the file is saved.
- local_savedir: The local save directory.
Returns
The error message if any.
121def run_batch(config: Config): 122 """ 123 Run the retuve pipeline on a batch of files. 124 125 :param config: The configuration. 126 """ 127 all_files = [] 128 129 for dataset in config.batch.datasets: 130 files = os.listdir(dataset) 131 132 files = [ 133 f"{dataset}/{file}" 134 for file in files 135 if any( 136 file.endswith(input_type) 137 for input_type in config.batch.input_types 138 ) 139 ] 140 141 all_files.extend(files) 142 143 start = time.time() 144 145 # create savedir if it doesn't exist 146 if not os.path.exists(config.api.savedir): 147 os.makedirs(config.api.savedir) 148 149 torch.multiprocessing.set_start_method("spawn", force=True) 150 151 with multiprocessing.Pool(processes=config.batch.processes) as pool: 152 chunks = [(config, file, True) for file in all_files] 153 errors = pool.starmap(run_single, chunks) 154 155 if any(error is not None for error in errors): 156 already_processed = sum( 157 "already processed" in error 158 for error in errors 159 if error is not None 160 ) 161 # count and remove all errors containing "already processed" 162 errors = [ 163 error 164 for error in errors 165 if error is not None and "already processed" not in error 166 ] 167 168 for error in errors: 169 ulogger.info(error) 170 171 ulogger.info(f"Errors: {len(errors)}") 172 ulogger.info(f"Already processed: {already_processed}") 173 174 end = time.time() 175 176 if len(all_files) == 0: 177 ulogger.info( 178 f"No files with types in {config.batch.input_types} " 179 "found in the directory" 180 ) 181 return 182 183 # convert to minutes and seconds 184 minutes, seconds = divmod(end - start, 60) 185 ulogger.info(f"Time taken: {minutes:.0f}m {seconds:.0f}s") 186 187 # Half to ignore the .nii files 188 no_of_files = len(all_files) // 2 189 190 if no_of_files == 0: 191 no_of_files = 1 192 193 # Print average time per file 194 ulogger.info(f"Average time per file: {(end - start) / no_of_files:.2f}s") 195 196 # Print number of files 197 ulogger.info(f"Number of files: {no_of_files}")
Run the retuve pipeline on a batch of files.
Parameters
- config: The configuration.