retuve.app.routes.live

The API for Retuve Live

  1# Copyright 2024 Adam McArthur
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16The API for Retuve Live
 17"""
 18
 19import asyncio
 20import logging
 21import os
 22import shutil
 23import traceback
 24from contextlib import asynccontextmanager
 25
 26from fastapi import APIRouter, Form, HTTPException, Request
 27
 28from retuve.app.classes import LiveResponse
 29from retuve.app.helpers import RESULTS_DIR, RESULTS_URL_ACCESS
 30from retuve.app.utils import (
 31    get_sorted_dicom_images,
 32    save_dicom_and_get_results,
 33    save_results,
 34    validate_api_token,
 35)
 36from retuve.keyphrases.config import Config
 37
 38logging.getLogger("httpx").setLevel(logging.CRITICAL)
 39
 40
 41class NoDicomFoundError(Exception):
 42    pass
 43
 44
 45# Create a global queue for DICOM processing
 46dicom_processing_queue = asyncio.Queue()
 47
 48processed_instance_ids = set()
 49
 50
 51async def process_dicom_queue():
 52    """
 53    Background task to process queued DICOM files and save results in `live_savedir`.
 54    """
 55    while True:
 56        dicom_data = await dicom_processing_queue.get()
 57        instance_id, dicom_content, config, live_savedir = dicom_data
 58
 59        if instance_id in processed_instance_ids:
 60            logging.info(f"Skipping already processed DICOM: {instance_id}")
 61            dicom_processing_queue.task_done()
 62            continue
 63
 64        try:
 65
 66            live_batchdir = config.batch.datasets[0]
 67
 68            # Define the directory for saving results
 69            result_dir = f"{live_savedir}/{instance_id}"
 70
 71            # check if metrics already exists
 72            if os.path.exists(f"{result_dir}/metrics.json"):
 73                logging.info(
 74                    f"Skipping already processed DICOM: {instance_id}"
 75                )
 76                dicom_processing_queue.task_done()
 77                continue
 78
 79            # Process the DICOM
 80            result = await save_dicom_and_get_results(
 81                live_batchdir, instance_id, dicom_content, config
 82            )
 83            if not result:
 84                dicom_processing_queue.task_done()
 85                continue
 86            await save_results(instance_id, live_savedir, result=result)
 87
 88            logging.info(
 89                f"Processed and saved results for DICOM: {instance_id} in {result_dir}"
 90            )
 91
 92            processed_instance_ids.add(instance_id)
 93
 94        except Exception as e:
 95            logging.error(
 96                f"Error processing DICOM {instance_id}: {traceback.format_exc()}"
 97            )
 98        finally:
 99            dicom_processing_queue.task_done()
100
101
102async def constantly_delete_temp_dirs(config):
103    """
104    Background task to constantly delete temporary directories.
105    """
106    if not config.zero_trust:
107        return
108
109    while True:
110        # Delete the temporary directory
111        if os.path.exists(RESULTS_DIR):
112            shutil.rmtree(RESULTS_DIR)
113            os.makedirs(RESULTS_DIR, exist_ok=True)
114
115        await asyncio.sleep(config.zero_trust_interval)
116
117
118@asynccontextmanager
119async def lifespan(app):
120    # Create the task once the event loop is running
121    task = asyncio.create_task(process_dicom_queue())
122    task2 = asyncio.create_task(
123        constantly_delete_temp_dirs(Config.live_config.name)
124    )
125
126    yield
127
128    task.cancel()
129    task2.cancel()
130
131
132router = APIRouter(lifespan=lifespan)
133
134
135@router.post(
136    "/api/live/",
137    response_model=LiveResponse,
138    responses={
139        204: {
140            "description": "No Content - Retuve Run likely to be in progress."
141        },
142        400: {"description": "Invalid file type. Expected a DICOM file."},
143        422: {"description": "No DICOM images found on the Orthanc server."},
144        500: {"description": "Internal Server Error"},
145    },
146)
147async def analyse_image(
148    request: Request,
149    keyphrase: str = Form(...),
150):
151    """
152    Analyze a file with a Retuve Model based on the keyphrase provided.
153
154    :param request: The request object.
155    :param keyphrase: The keyphrase to be used for analysis.
156    :raises HTTPException: If the file or keyphrase is invalid.
157    """
158
159    api_token = request.cookies.get("api_token")
160    validate_api_token(api_token)
161    instance_id = "Unknown"
162
163    try:
164        config = Config.get_config(keyphrase)
165
166        if config.api.zero_trust:
167            live_savedir = RESULTS_DIR
168            live_batchdir = RESULTS_DIR
169        else:
170            live_savedir = config.api.savedir
171            live_batchdir = config.batch.datasets[0]
172
173        hippa_logger = request.app.config[config.name].hippa_logger
174
175        # Usage example: Fetch all DICOMs and find the latest one
176        dicoms, request.app.latest_time = await get_sorted_dicom_images(
177            config.api.orthanc_url,
178            config.api.orthanc_username,
179            config.api.orthanc_password,
180            latest_time=request.app.latest_time,
181        )
182
183        if dicoms:
184            # Get the latest DICOM (the last one in the sorted list)
185            latest_dicom, instance_id = dicoms[-1]
186        else:
187            raise NoDicomFoundError(
188                "No DICOM images found on the Orthanc server."
189            )
190
191        if (
192            request.app.instance_id_cache == instance_id
193            and not config.api.zero_trust
194        ):
195            hippa_logger.debug(
196                f"Retuve Run for {instance_id} with keyphrase {keyphrase} "
197                f"from host: {request.client.host} has been soft-cached."
198            )
199            if not request.app.model_response_cache:
200                raise HTTPException(
201                    status_code=204,
202                )
203
204            return request.app.model_response_cache
205
206        request.app.instance_id_cache = instance_id
207
208        if os.path.exists(f"{live_savedir}/{instance_id}"):
209
210            video_path, img_path, _ = await save_results(
211                instance_id, live_savedir, just_paths=True
212            )
213
214            video_path = video_path.replace(
215                live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}"
216            )
217            img_path = img_path.replace(
218                live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}"
219            )
220
221            request.app.model_response_cache = LiveResponse(
222                file_id=instance_id,
223                video_url=video_path,
224                img_url=img_path,
225            )
226
227            hippa_logger.debug(
228                f"Retuve Run for {instance_id} with keyphrase {keyphrase} "
229                f"from host: {request.client.host} has been hard-cached."
230            )
231
232            return request.app.model_response_cache
233
234        result = await save_dicom_and_get_results(
235            live_batchdir, instance_id, latest_dicom, config
236        )
237
238        video_path, img_path, _ = await save_results(
239            instance_id, live_savedir, result=result
240        )
241
242        video_path = video_path.replace(
243            live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}"
244        )
245        img_path = img_path.replace(
246            live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}"
247        )
248
249        # Mark success in hippa logs
250        hippa_logger.info(
251            f"Successfully processed {instance_id} with keyphrase {keyphrase} "
252            f"from host: {request.client.host}."
253        )
254
255        request.app.model_response_cache = LiveResponse(
256            file_id=instance_id,
257            video_url=video_path if video_path else "",
258            img_url=img_path if img_path else "",
259        )
260
261        # check if there is a valid response, otherwise raise a 500 error
262        if not request.app.model_response_cache:
263            raise HTTPException(
264                status_code=500, detail="Internal Server Error, check logs."
265            )
266
267        if config.api.zero_trust:
268            return request.app.model_response_cache
269
270        # Queue the remaining DICOMs for background processing
271        for dicom, dicom_id in dicoms[:-1]:
272            await dicom_processing_queue.put(
273                (dicom_id, dicom, config, live_savedir)
274            )
275
276        return request.app.model_response_cache
277
278    except NoDicomFoundError as e:
279        # raise a 422 error
280        raise HTTPException(status_code=422, detail=str(e))
281
282    except HTTPException as http_exc:
283        # Allow expected HTTP exceptions to pass through without wrapping in a 500
284        raise http_exc
285
286    except Exception as e:
287        hippa_logger.error(
288            f"Error in processing {instance_id} with keyphrase {keyphrase} "
289            f"from host: {request.client.host}."
290        )
291        hippa_logger.error(traceback.format_exc())
292        raise HTTPException(
293            status_code=500, detail="Internal Server Error, check logs."
294        )
class NoDicomFoundError(builtins.Exception):
42class NoDicomFoundError(Exception):
43    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
dicom_processing_queue = <Queue at 0x7ff62c793af0 maxsize=0>
processed_instance_ids = set()
async def process_dicom_queue():
 52async def process_dicom_queue():
 53    """
 54    Background task to process queued DICOM files and save results in `live_savedir`.
 55    """
 56    while True:
 57        dicom_data = await dicom_processing_queue.get()
 58        instance_id, dicom_content, config, live_savedir = dicom_data
 59
 60        if instance_id in processed_instance_ids:
 61            logging.info(f"Skipping already processed DICOM: {instance_id}")
 62            dicom_processing_queue.task_done()
 63            continue
 64
 65        try:
 66
 67            live_batchdir = config.batch.datasets[0]
 68
 69            # Define the directory for saving results
 70            result_dir = f"{live_savedir}/{instance_id}"
 71
 72            # check if metrics already exists
 73            if os.path.exists(f"{result_dir}/metrics.json"):
 74                logging.info(
 75                    f"Skipping already processed DICOM: {instance_id}"
 76                )
 77                dicom_processing_queue.task_done()
 78                continue
 79
 80            # Process the DICOM
 81            result = await save_dicom_and_get_results(
 82                live_batchdir, instance_id, dicom_content, config
 83            )
 84            if not result:
 85                dicom_processing_queue.task_done()
 86                continue
 87            await save_results(instance_id, live_savedir, result=result)
 88
 89            logging.info(
 90                f"Processed and saved results for DICOM: {instance_id} in {result_dir}"
 91            )
 92
 93            processed_instance_ids.add(instance_id)
 94
 95        except Exception as e:
 96            logging.error(
 97                f"Error processing DICOM {instance_id}: {traceback.format_exc()}"
 98            )
 99        finally:
100            dicom_processing_queue.task_done()

Background task to process queued DICOM files and save results in live_savedir.

async def constantly_delete_temp_dirs(config):
103async def constantly_delete_temp_dirs(config):
104    """
105    Background task to constantly delete temporary directories.
106    """
107    if not config.zero_trust:
108        return
109
110    while True:
111        # Delete the temporary directory
112        if os.path.exists(RESULTS_DIR):
113            shutil.rmtree(RESULTS_DIR)
114            os.makedirs(RESULTS_DIR, exist_ok=True)
115
116        await asyncio.sleep(config.zero_trust_interval)

Background task to constantly delete temporary directories.

@asynccontextmanager
async def lifespan(app):
119@asynccontextmanager
120async def lifespan(app):
121    # Create the task once the event loop is running
122    task = asyncio.create_task(process_dicom_queue())
123    task2 = asyncio.create_task(
124        constantly_delete_temp_dirs(Config.live_config.name)
125    )
126
127    yield
128
129    task.cancel()
130    task2.cancel()
router = <fastapi.routing.APIRouter object>
@router.post('/api/live/', response_model=LiveResponse, responses={204: {'description': 'No Content - Retuve Run likely to be in progress.'}, 400: {'description': 'Invalid file type. Expected a DICOM file.'}, 422: {'description': 'No DICOM images found on the Orthanc server.'}, 500: {'description': 'Internal Server Error'}})
async def analyse_image( request: starlette.requests.Request, keyphrase: str = Form(PydanticUndefined)):
136@router.post(
137    "/api/live/",
138    response_model=LiveResponse,
139    responses={
140        204: {
141            "description": "No Content - Retuve Run likely to be in progress."
142        },
143        400: {"description": "Invalid file type. Expected a DICOM file."},
144        422: {"description": "No DICOM images found on the Orthanc server."},
145        500: {"description": "Internal Server Error"},
146    },
147)
148async def analyse_image(
149    request: Request,
150    keyphrase: str = Form(...),
151):
152    """
153    Analyze a file with a Retuve Model based on the keyphrase provided.
154
155    :param request: The request object.
156    :param keyphrase: The keyphrase to be used for analysis.
157    :raises HTTPException: If the file or keyphrase is invalid.
158    """
159
160    api_token = request.cookies.get("api_token")
161    validate_api_token(api_token)
162    instance_id = "Unknown"
163
164    try:
165        config = Config.get_config(keyphrase)
166
167        if config.api.zero_trust:
168            live_savedir = RESULTS_DIR
169            live_batchdir = RESULTS_DIR
170        else:
171            live_savedir = config.api.savedir
172            live_batchdir = config.batch.datasets[0]
173
174        hippa_logger = request.app.config[config.name].hippa_logger
175
176        # Usage example: Fetch all DICOMs and find the latest one
177        dicoms, request.app.latest_time = await get_sorted_dicom_images(
178            config.api.orthanc_url,
179            config.api.orthanc_username,
180            config.api.orthanc_password,
181            latest_time=request.app.latest_time,
182        )
183
184        if dicoms:
185            # Get the latest DICOM (the last one in the sorted list)
186            latest_dicom, instance_id = dicoms[-1]
187        else:
188            raise NoDicomFoundError(
189                "No DICOM images found on the Orthanc server."
190            )
191
192        if (
193            request.app.instance_id_cache == instance_id
194            and not config.api.zero_trust
195        ):
196            hippa_logger.debug(
197                f"Retuve Run for {instance_id} with keyphrase {keyphrase} "
198                f"from host: {request.client.host} has been soft-cached."
199            )
200            if not request.app.model_response_cache:
201                raise HTTPException(
202                    status_code=204,
203                )
204
205            return request.app.model_response_cache
206
207        request.app.instance_id_cache = instance_id
208
209        if os.path.exists(f"{live_savedir}/{instance_id}"):
210
211            video_path, img_path, _ = await save_results(
212                instance_id, live_savedir, just_paths=True
213            )
214
215            video_path = video_path.replace(
216                live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}"
217            )
218            img_path = img_path.replace(
219                live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}"
220            )
221
222            request.app.model_response_cache = LiveResponse(
223                file_id=instance_id,
224                video_url=video_path,
225                img_url=img_path,
226            )
227
228            hippa_logger.debug(
229                f"Retuve Run for {instance_id} with keyphrase {keyphrase} "
230                f"from host: {request.client.host} has been hard-cached."
231            )
232
233            return request.app.model_response_cache
234
235        result = await save_dicom_and_get_results(
236            live_batchdir, instance_id, latest_dicom, config
237        )
238
239        video_path, img_path, _ = await save_results(
240            instance_id, live_savedir, result=result
241        )
242
243        video_path = video_path.replace(
244            live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}"
245        )
246        img_path = img_path.replace(
247            live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}"
248        )
249
250        # Mark success in hippa logs
251        hippa_logger.info(
252            f"Successfully processed {instance_id} with keyphrase {keyphrase} "
253            f"from host: {request.client.host}."
254        )
255
256        request.app.model_response_cache = LiveResponse(
257            file_id=instance_id,
258            video_url=video_path if video_path else "",
259            img_url=img_path if img_path else "",
260        )
261
262        # check if there is a valid response, otherwise raise a 500 error
263        if not request.app.model_response_cache:
264            raise HTTPException(
265                status_code=500, detail="Internal Server Error, check logs."
266            )
267
268        if config.api.zero_trust:
269            return request.app.model_response_cache
270
271        # Queue the remaining DICOMs for background processing
272        for dicom, dicom_id in dicoms[:-1]:
273            await dicom_processing_queue.put(
274                (dicom_id, dicom, config, live_savedir)
275            )
276
277        return request.app.model_response_cache
278
279    except NoDicomFoundError as e:
280        # raise a 422 error
281        raise HTTPException(status_code=422, detail=str(e))
282
283    except HTTPException as http_exc:
284        # Allow expected HTTP exceptions to pass through without wrapping in a 500
285        raise http_exc
286
287    except Exception as e:
288        hippa_logger.error(
289            f"Error in processing {instance_id} with keyphrase {keyphrase} "
290            f"from host: {request.client.host}."
291        )
292        hippa_logger.error(traceback.format_exc())
293        raise HTTPException(
294            status_code=500, detail="Internal Server Error, check logs."
295        )

Analyze a file with a Retuve Model based on the keyphrase provided.

Parameters
  • request: The request object.
  • keyphrase: The keyphrase to be used for analysis.
Raises
  • HTTPException: If the file or keyphrase is invalid.