retuve.batch

This module contains the functions to run the retuve pipeline on a batch of files. It also has the functions used to make the CLI commands for running retuve on a single file or a batch of files.

  1# Copyright 2024 Adam McArthur
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16This module contains the functions to run the retuve pipeline on a batch of
 17files. It also has the functions used to make the CLI commands for running retuve
 18on a single file or a batch of files.
 19"""
 20
 21import json
 22import multiprocessing
 23import os
 24import shutil
 25import time
 26import traceback
 27
 28import torch
 29
 30from retuve.funcs import retuve_run
 31from retuve.keyphrases.config import Config
 32from retuve.keyphrases.enums import Outputs
 33from retuve.logs import ulogger
 34
 35
 36def run_single(
 37    config: Config,
 38    file_name: str,
 39    for_batch: bool = False,
 40    local_savedir: str = None,
 41):
 42    """
 43    Run the retuve pipeline on a single file.
 44
 45    :param config: The configuration.
 46    :param file_name: The file name.
 47    :param for_batch: Whether the file is being processed in a batch.
 48                      This changes the way the file is saved.
 49    :param local_savedir: The local save directory.
 50
 51    :return: The error message if any.
 52    """
 53
 54    fileid = file_name.split("/")[-1].split(".")[0]
 55    if local_savedir:
 56        savedir = local_savedir
 57    else:
 58        savedir = config.api.savedir
 59
 60    hip_mode = config.batch.hip_mode
 61
 62    if for_batch:
 63        fileid += "/"
 64
 65        # if the metrics.json already exist, skip
 66        if os.path.exists(f"{savedir}/{fileid}metrics.json"):
 67            return f"File {fileid} already processed"
 68
 69        if os.path.exists(f"{savedir}/{fileid}"):
 70            shutil.rmtree(f"{savedir}/{fileid}")
 71
 72        os.makedirs(f"{savedir}/{fileid}")
 73    else:
 74        fileid += "_"
 75
 76    try:
 77        retuve_result = retuve_run(
 78            hip_mode=hip_mode,
 79            config=config,
 80            modes_func=config.batch.mode_func,
 81            modes_func_kwargs_dict=config.batch.mode_func_args,
 82            file=file_name,
 83        )
 84        hip_datas = retuve_result.hip_datas
 85
 86        if hip_datas and hip_datas.recorded_error:
 87            ulogger.info(
 88                f"\n Recorded Error: {hip_datas.recorded_error} "
 89                f"Critical: {hip_datas.recorded_error.critical}"
 90            )
 91
 92        if retuve_result.image is not None:
 93            retuve_result.image.save(f"{savedir}/{fileid}{Outputs.IMAGE}")
 94
 95        if retuve_result.metrics and retuve_result.metrics.get("dev_metrics"):
 96            ulogger.info(
 97                "\n Dev Metrics: ", retuve_result.metrics["dev_metrics"]
 98            )
 99
100        if retuve_result.video_clip is not None:
101            retuve_result.video_clip.write_videofile(
102                f"{savedir}/{fileid}{Outputs.VIDEO_CLIP}",
103            )
104
105        if retuve_result.visual_3d is not None:
106            retuve_result.visual_3d.write_html(
107                f"{savedir}/{fileid}{Outputs.VISUAL3D}"
108            )
109
110        # save the metrics to a file
111        with open(f"{savedir}/{fileid}{Outputs.METRICS}", "w") as f:
112            f.write(json.dumps(retuve_result.metrics))
113
114    except Exception as e:
115        e = traceback.format_exc()
116        ulogger.error(f"Error processing file {file_name}: {e}")
117        return e
118
119
120def run_batch(config: Config):
121    """
122    Run the retuve pipeline on a batch of files.
123
124    :param config: The configuration.
125    """
126    all_files = []
127
128    for dataset in config.batch.datasets:
129        files = os.listdir(dataset)
130
131        files = [
132            f"{dataset}/{file}"
133            for file in files
134            if any(
135                file.endswith(input_type)
136                for input_type in config.batch.input_types
137            )
138        ]
139
140        all_files.extend(files)
141
142    start = time.time()
143
144    # create savedir if it doesn't exist
145    if not os.path.exists(config.api.savedir):
146        os.makedirs(config.api.savedir)
147
148    torch.multiprocessing.set_start_method("spawn", force=True)
149
150    with multiprocessing.Pool(processes=config.batch.processes) as pool:
151        chunks = [(config, file, True) for file in all_files]
152        errors = pool.starmap(run_single, chunks)
153
154    if any(error is not None for error in errors):
155        already_processed = sum(
156            "already processed" in error
157            for error in errors
158            if error is not None
159        )
160        # count and remove all errors containing "already processed"
161        errors = [
162            error
163            for error in errors
164            if error is not None and "already processed" not in error
165        ]
166
167        for error in errors:
168            ulogger.info(error)
169
170        ulogger.info(f"Errors: {len(errors)}")
171        ulogger.info(f"Already processed: {already_processed}")
172
173    end = time.time()
174
175    if len(all_files) == 0:
176        ulogger.info(
177            f"No files with types in {config.batch.input_types} "
178            "found in the directory"
179        )
180        return
181
182    # convert to minutes and seconds
183    minutes, seconds = divmod(end - start, 60)
184    ulogger.info(f"Time taken: {minutes:.0f}m {seconds:.0f}s")
185
186    # Half to ignore the .nii files
187    no_of_files = len(all_files) // 2
188
189    if no_of_files == 0:
190        no_of_files = 1
191
192    # Print average time per file
193    ulogger.info(f"Average time per file: {(end - start) / no_of_files:.2f}s")
194
195    # Print number of files
196    ulogger.info(f"Number of files: {no_of_files}")
def run_single( config: retuve.keyphrases.config.Config, file_name: str, for_batch: bool = False, local_savedir: str = None):
 37def run_single(
 38    config: Config,
 39    file_name: str,
 40    for_batch: bool = False,
 41    local_savedir: str = None,
 42):
 43    """
 44    Run the retuve pipeline on a single file.
 45
 46    :param config: The configuration.
 47    :param file_name: The file name.
 48    :param for_batch: Whether the file is being processed in a batch.
 49                      This changes the way the file is saved.
 50    :param local_savedir: The local save directory.
 51
 52    :return: The error message if any.
 53    """
 54
 55    fileid = file_name.split("/")[-1].split(".")[0]
 56    if local_savedir:
 57        savedir = local_savedir
 58    else:
 59        savedir = config.api.savedir
 60
 61    hip_mode = config.batch.hip_mode
 62
 63    if for_batch:
 64        fileid += "/"
 65
 66        # if the metrics.json already exist, skip
 67        if os.path.exists(f"{savedir}/{fileid}metrics.json"):
 68            return f"File {fileid} already processed"
 69
 70        if os.path.exists(f"{savedir}/{fileid}"):
 71            shutil.rmtree(f"{savedir}/{fileid}")
 72
 73        os.makedirs(f"{savedir}/{fileid}")
 74    else:
 75        fileid += "_"
 76
 77    try:
 78        retuve_result = retuve_run(
 79            hip_mode=hip_mode,
 80            config=config,
 81            modes_func=config.batch.mode_func,
 82            modes_func_kwargs_dict=config.batch.mode_func_args,
 83            file=file_name,
 84        )
 85        hip_datas = retuve_result.hip_datas
 86
 87        if hip_datas and hip_datas.recorded_error:
 88            ulogger.info(
 89                f"\n Recorded Error: {hip_datas.recorded_error} "
 90                f"Critical: {hip_datas.recorded_error.critical}"
 91            )
 92
 93        if retuve_result.image is not None:
 94            retuve_result.image.save(f"{savedir}/{fileid}{Outputs.IMAGE}")
 95
 96        if retuve_result.metrics and retuve_result.metrics.get("dev_metrics"):
 97            ulogger.info(
 98                "\n Dev Metrics: ", retuve_result.metrics["dev_metrics"]
 99            )
100
101        if retuve_result.video_clip is not None:
102            retuve_result.video_clip.write_videofile(
103                f"{savedir}/{fileid}{Outputs.VIDEO_CLIP}",
104            )
105
106        if retuve_result.visual_3d is not None:
107            retuve_result.visual_3d.write_html(
108                f"{savedir}/{fileid}{Outputs.VISUAL3D}"
109            )
110
111        # save the metrics to a file
112        with open(f"{savedir}/{fileid}{Outputs.METRICS}", "w") as f:
113            f.write(json.dumps(retuve_result.metrics))
114
115    except Exception as e:
116        e = traceback.format_exc()
117        ulogger.error(f"Error processing file {file_name}: {e}")
118        return e

Run the retuve pipeline on a single file.

Parameters
  • config: The configuration.
  • file_name: The file name.
  • for_batch: Whether the file is being processed in a batch. This changes the way the file is saved.
  • local_savedir: The local save directory.
Returns

The error message if any.

def run_batch(config: retuve.keyphrases.config.Config):
121def run_batch(config: Config):
122    """
123    Run the retuve pipeline on a batch of files.
124
125    :param config: The configuration.
126    """
127    all_files = []
128
129    for dataset in config.batch.datasets:
130        files = os.listdir(dataset)
131
132        files = [
133            f"{dataset}/{file}"
134            for file in files
135            if any(
136                file.endswith(input_type)
137                for input_type in config.batch.input_types
138            )
139        ]
140
141        all_files.extend(files)
142
143    start = time.time()
144
145    # create savedir if it doesn't exist
146    if not os.path.exists(config.api.savedir):
147        os.makedirs(config.api.savedir)
148
149    torch.multiprocessing.set_start_method("spawn", force=True)
150
151    with multiprocessing.Pool(processes=config.batch.processes) as pool:
152        chunks = [(config, file, True) for file in all_files]
153        errors = pool.starmap(run_single, chunks)
154
155    if any(error is not None for error in errors):
156        already_processed = sum(
157            "already processed" in error
158            for error in errors
159            if error is not None
160        )
161        # count and remove all errors containing "already processed"
162        errors = [
163            error
164            for error in errors
165            if error is not None and "already processed" not in error
166        ]
167
168        for error in errors:
169            ulogger.info(error)
170
171        ulogger.info(f"Errors: {len(errors)}")
172        ulogger.info(f"Already processed: {already_processed}")
173
174    end = time.time()
175
176    if len(all_files) == 0:
177        ulogger.info(
178            f"No files with types in {config.batch.input_types} "
179            "found in the directory"
180        )
181        return
182
183    # convert to minutes and seconds
184    minutes, seconds = divmod(end - start, 60)
185    ulogger.info(f"Time taken: {minutes:.0f}m {seconds:.0f}s")
186
187    # Half to ignore the .nii files
188    no_of_files = len(all_files) // 2
189
190    if no_of_files == 0:
191        no_of_files = 1
192
193    # Print average time per file
194    ulogger.info(f"Average time per file: {(end - start) / no_of_files:.2f}s")
195
196    # Print number of files
197    ulogger.info(f"Number of files: {no_of_files}")

Run the retuve pipeline on a batch of files.

Parameters
  • config: The configuration.