retuve.app.routes.live
The API for Retuve Live
1# Copyright 2024 Adam McArthur 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16The API for Retuve Live 17""" 18 19import asyncio 20import logging 21import os 22import shutil 23import traceback 24from contextlib import asynccontextmanager 25 26from fastapi import APIRouter, Form, HTTPException, Request 27 28from retuve.app.classes import LiveResponse 29from retuve.app.helpers import RESULTS_DIR, RESULTS_URL_ACCESS 30from retuve.app.utils import ( 31 get_sorted_dicom_images, 32 save_dicom_and_get_results, 33 save_results, 34 validate_api_token, 35) 36from retuve.keyphrases.config import Config 37 38logging.getLogger("httpx").setLevel(logging.CRITICAL) 39 40 41class NoDicomFoundError(Exception): 42 pass 43 44 45# Create a global queue for DICOM processing 46dicom_processing_queue = asyncio.Queue() 47 48processed_instance_ids = set() 49 50 51async def process_dicom_queue(): 52 """ 53 Background task to process queued DICOM files and save results in `live_savedir`. 54 """ 55 while True: 56 dicom_data = await dicom_processing_queue.get() 57 instance_id, dicom_content, config, live_savedir = dicom_data 58 59 if instance_id in processed_instance_ids: 60 logging.info(f"Skipping already processed DICOM: {instance_id}") 61 dicom_processing_queue.task_done() 62 continue 63 64 try: 65 66 live_batchdir = config.batch.datasets[0] 67 68 # Define the directory for saving results 69 result_dir = f"{live_savedir}/{instance_id}" 70 71 # check if metrics already exists 72 if os.path.exists(f"{result_dir}/metrics.json"): 73 logging.info( 74 f"Skipping already processed DICOM: {instance_id}" 75 ) 76 dicom_processing_queue.task_done() 77 continue 78 79 # Process the DICOM 80 result = await save_dicom_and_get_results( 81 live_batchdir, instance_id, dicom_content, config 82 ) 83 if not result: 84 dicom_processing_queue.task_done() 85 continue 86 await save_results(instance_id, live_savedir, result=result) 87 88 logging.info( 89 f"Processed and saved results for DICOM: {instance_id} in {result_dir}" 90 ) 91 92 processed_instance_ids.add(instance_id) 93 94 except Exception as e: 95 logging.error( 96 f"Error processing DICOM {instance_id}: {traceback.format_exc()}" 97 ) 98 finally: 99 dicom_processing_queue.task_done() 100 101 102async def constantly_delete_temp_dirs(config): 103 """ 104 Background task to constantly delete temporary directories. 105 """ 106 if not config.zero_trust: 107 return 108 109 while True: 110 # Delete the temporary directory 111 if os.path.exists(RESULTS_DIR): 112 shutil.rmtree(RESULTS_DIR) 113 os.makedirs(RESULTS_DIR, exist_ok=True) 114 115 await asyncio.sleep(config.zero_trust_interval) 116 117 118@asynccontextmanager 119async def lifespan(app): 120 # Create the task once the event loop is running 121 task = asyncio.create_task(process_dicom_queue()) 122 task2 = asyncio.create_task( 123 constantly_delete_temp_dirs(Config.live_config.name) 124 ) 125 126 yield 127 128 task.cancel() 129 task2.cancel() 130 131 132router = APIRouter(lifespan=lifespan) 133 134 135@router.post( 136 "/api/live/", 137 response_model=LiveResponse, 138 responses={ 139 204: { 140 "description": "No Content - Retuve Run likely to be in progress." 141 }, 142 400: {"description": "Invalid file type. Expected a DICOM file."}, 143 422: {"description": "No DICOM images found on the Orthanc server."}, 144 500: {"description": "Internal Server Error"}, 145 }, 146) 147async def analyse_image( 148 request: Request, 149 keyphrase: str = Form(...), 150): 151 """ 152 Analyze a file with a Retuve Model based on the keyphrase provided. 153 154 :param request: The request object. 155 :param keyphrase: The keyphrase to be used for analysis. 156 :raises HTTPException: If the file or keyphrase is invalid. 157 """ 158 159 api_token = request.cookies.get("api_token") 160 validate_api_token(api_token) 161 instance_id = "Unknown" 162 163 try: 164 config = Config.get_config(keyphrase) 165 166 if config.api.zero_trust: 167 live_savedir = RESULTS_DIR 168 live_batchdir = RESULTS_DIR 169 else: 170 live_savedir = config.api.savedir 171 live_batchdir = config.batch.datasets[0] 172 173 hippa_logger = request.app.config[config.name].hippa_logger 174 175 # Usage example: Fetch all DICOMs and find the latest one 176 dicoms, request.app.latest_time = await get_sorted_dicom_images( 177 config.api.orthanc_url, 178 config.api.orthanc_username, 179 config.api.orthanc_password, 180 latest_time=request.app.latest_time, 181 ) 182 183 if dicoms: 184 # Get the latest DICOM (the last one in the sorted list) 185 latest_dicom, instance_id = dicoms[-1] 186 else: 187 raise NoDicomFoundError( 188 "No DICOM images found on the Orthanc server." 189 ) 190 191 if ( 192 request.app.instance_id_cache == instance_id 193 and not config.api.zero_trust 194 ): 195 hippa_logger.debug( 196 f"Retuve Run for {instance_id} with keyphrase {keyphrase} " 197 f"from host: {request.client.host} has been soft-cached." 198 ) 199 if not request.app.model_response_cache: 200 raise HTTPException( 201 status_code=204, 202 ) 203 204 return request.app.model_response_cache 205 206 request.app.instance_id_cache = instance_id 207 208 if os.path.exists(f"{live_savedir}/{instance_id}"): 209 210 video_path, img_path, _ = await save_results( 211 instance_id, live_savedir, just_paths=True 212 ) 213 214 video_path = video_path.replace( 215 live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}" 216 ) 217 img_path = img_path.replace( 218 live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}" 219 ) 220 221 request.app.model_response_cache = LiveResponse( 222 file_id=instance_id, 223 video_url=video_path, 224 img_url=img_path, 225 ) 226 227 hippa_logger.debug( 228 f"Retuve Run for {instance_id} with keyphrase {keyphrase} " 229 f"from host: {request.client.host} has been hard-cached." 230 ) 231 232 return request.app.model_response_cache 233 234 result = await save_dicom_and_get_results( 235 live_batchdir, instance_id, latest_dicom, config 236 ) 237 238 video_path, img_path, _ = await save_results( 239 instance_id, live_savedir, result=result 240 ) 241 242 video_path = video_path.replace( 243 live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}" 244 ) 245 img_path = img_path.replace( 246 live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}" 247 ) 248 249 # Mark success in hippa logs 250 hippa_logger.info( 251 f"Successfully processed {instance_id} with keyphrase {keyphrase} " 252 f"from host: {request.client.host}." 253 ) 254 255 request.app.model_response_cache = LiveResponse( 256 file_id=instance_id, 257 video_url=video_path if video_path else "", 258 img_url=img_path if img_path else "", 259 ) 260 261 # check if there is a valid response, otherwise raise a 500 error 262 if not request.app.model_response_cache: 263 raise HTTPException( 264 status_code=500, detail="Internal Server Error, check logs." 265 ) 266 267 if config.api.zero_trust: 268 return request.app.model_response_cache 269 270 # Queue the remaining DICOMs for background processing 271 for dicom, dicom_id in dicoms[:-1]: 272 await dicom_processing_queue.put( 273 (dicom_id, dicom, config, live_savedir) 274 ) 275 276 return request.app.model_response_cache 277 278 except NoDicomFoundError as e: 279 # raise a 422 error 280 raise HTTPException(status_code=422, detail=str(e)) 281 282 except HTTPException as http_exc: 283 # Allow expected HTTP exceptions to pass through without wrapping in a 500 284 raise http_exc 285 286 except Exception as e: 287 hippa_logger.error( 288 f"Error in processing {instance_id} with keyphrase {keyphrase} " 289 f"from host: {request.client.host}." 290 ) 291 hippa_logger.error(traceback.format_exc()) 292 raise HTTPException( 293 status_code=500, detail="Internal Server Error, check logs." 294 )
class
NoDicomFoundError(builtins.Exception):
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
dicom_processing_queue =
<Queue at 0x7ff62c793af0 maxsize=0>
processed_instance_ids =
set()
async def
process_dicom_queue():
52async def process_dicom_queue(): 53 """ 54 Background task to process queued DICOM files and save results in `live_savedir`. 55 """ 56 while True: 57 dicom_data = await dicom_processing_queue.get() 58 instance_id, dicom_content, config, live_savedir = dicom_data 59 60 if instance_id in processed_instance_ids: 61 logging.info(f"Skipping already processed DICOM: {instance_id}") 62 dicom_processing_queue.task_done() 63 continue 64 65 try: 66 67 live_batchdir = config.batch.datasets[0] 68 69 # Define the directory for saving results 70 result_dir = f"{live_savedir}/{instance_id}" 71 72 # check if metrics already exists 73 if os.path.exists(f"{result_dir}/metrics.json"): 74 logging.info( 75 f"Skipping already processed DICOM: {instance_id}" 76 ) 77 dicom_processing_queue.task_done() 78 continue 79 80 # Process the DICOM 81 result = await save_dicom_and_get_results( 82 live_batchdir, instance_id, dicom_content, config 83 ) 84 if not result: 85 dicom_processing_queue.task_done() 86 continue 87 await save_results(instance_id, live_savedir, result=result) 88 89 logging.info( 90 f"Processed and saved results for DICOM: {instance_id} in {result_dir}" 91 ) 92 93 processed_instance_ids.add(instance_id) 94 95 except Exception as e: 96 logging.error( 97 f"Error processing DICOM {instance_id}: {traceback.format_exc()}" 98 ) 99 finally: 100 dicom_processing_queue.task_done()
Background task to process queued DICOM files and save results in live_savedir
.
async def
constantly_delete_temp_dirs(config):
103async def constantly_delete_temp_dirs(config): 104 """ 105 Background task to constantly delete temporary directories. 106 """ 107 if not config.zero_trust: 108 return 109 110 while True: 111 # Delete the temporary directory 112 if os.path.exists(RESULTS_DIR): 113 shutil.rmtree(RESULTS_DIR) 114 os.makedirs(RESULTS_DIR, exist_ok=True) 115 116 await asyncio.sleep(config.zero_trust_interval)
Background task to constantly delete temporary directories.
@asynccontextmanager
async def
lifespan(app):
119@asynccontextmanager 120async def lifespan(app): 121 # Create the task once the event loop is running 122 task = asyncio.create_task(process_dicom_queue()) 123 task2 = asyncio.create_task( 124 constantly_delete_temp_dirs(Config.live_config.name) 125 ) 126 127 yield 128 129 task.cancel() 130 task2.cancel()
router =
<fastapi.routing.APIRouter object>
@router.post('/api/live/', response_model=LiveResponse, responses={204: {'description': 'No Content - Retuve Run likely to be in progress.'}, 400: {'description': 'Invalid file type. Expected a DICOM file.'}, 422: {'description': 'No DICOM images found on the Orthanc server.'}, 500: {'description': 'Internal Server Error'}})
async def
analyse_image( request: starlette.requests.Request, keyphrase: str = Form(PydanticUndefined)):
136@router.post( 137 "/api/live/", 138 response_model=LiveResponse, 139 responses={ 140 204: { 141 "description": "No Content - Retuve Run likely to be in progress." 142 }, 143 400: {"description": "Invalid file type. Expected a DICOM file."}, 144 422: {"description": "No DICOM images found on the Orthanc server."}, 145 500: {"description": "Internal Server Error"}, 146 }, 147) 148async def analyse_image( 149 request: Request, 150 keyphrase: str = Form(...), 151): 152 """ 153 Analyze a file with a Retuve Model based on the keyphrase provided. 154 155 :param request: The request object. 156 :param keyphrase: The keyphrase to be used for analysis. 157 :raises HTTPException: If the file or keyphrase is invalid. 158 """ 159 160 api_token = request.cookies.get("api_token") 161 validate_api_token(api_token) 162 instance_id = "Unknown" 163 164 try: 165 config = Config.get_config(keyphrase) 166 167 if config.api.zero_trust: 168 live_savedir = RESULTS_DIR 169 live_batchdir = RESULTS_DIR 170 else: 171 live_savedir = config.api.savedir 172 live_batchdir = config.batch.datasets[0] 173 174 hippa_logger = request.app.config[config.name].hippa_logger 175 176 # Usage example: Fetch all DICOMs and find the latest one 177 dicoms, request.app.latest_time = await get_sorted_dicom_images( 178 config.api.orthanc_url, 179 config.api.orthanc_username, 180 config.api.orthanc_password, 181 latest_time=request.app.latest_time, 182 ) 183 184 if dicoms: 185 # Get the latest DICOM (the last one in the sorted list) 186 latest_dicom, instance_id = dicoms[-1] 187 else: 188 raise NoDicomFoundError( 189 "No DICOM images found on the Orthanc server." 190 ) 191 192 if ( 193 request.app.instance_id_cache == instance_id 194 and not config.api.zero_trust 195 ): 196 hippa_logger.debug( 197 f"Retuve Run for {instance_id} with keyphrase {keyphrase} " 198 f"from host: {request.client.host} has been soft-cached." 199 ) 200 if not request.app.model_response_cache: 201 raise HTTPException( 202 status_code=204, 203 ) 204 205 return request.app.model_response_cache 206 207 request.app.instance_id_cache = instance_id 208 209 if os.path.exists(f"{live_savedir}/{instance_id}"): 210 211 video_path, img_path, _ = await save_results( 212 instance_id, live_savedir, just_paths=True 213 ) 214 215 video_path = video_path.replace( 216 live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}" 217 ) 218 img_path = img_path.replace( 219 live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}" 220 ) 221 222 request.app.model_response_cache = LiveResponse( 223 file_id=instance_id, 224 video_url=video_path, 225 img_url=img_path, 226 ) 227 228 hippa_logger.debug( 229 f"Retuve Run for {instance_id} with keyphrase {keyphrase} " 230 f"from host: {request.client.host} has been hard-cached." 231 ) 232 233 return request.app.model_response_cache 234 235 result = await save_dicom_and_get_results( 236 live_batchdir, instance_id, latest_dicom, config 237 ) 238 239 video_path, img_path, _ = await save_results( 240 instance_id, live_savedir, result=result 241 ) 242 243 video_path = video_path.replace( 244 live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}" 245 ) 246 img_path = img_path.replace( 247 live_savedir, f"{RESULTS_URL_ACCESS}/{config.name}" 248 ) 249 250 # Mark success in hippa logs 251 hippa_logger.info( 252 f"Successfully processed {instance_id} with keyphrase {keyphrase} " 253 f"from host: {request.client.host}." 254 ) 255 256 request.app.model_response_cache = LiveResponse( 257 file_id=instance_id, 258 video_url=video_path if video_path else "", 259 img_url=img_path if img_path else "", 260 ) 261 262 # check if there is a valid response, otherwise raise a 500 error 263 if not request.app.model_response_cache: 264 raise HTTPException( 265 status_code=500, detail="Internal Server Error, check logs." 266 ) 267 268 if config.api.zero_trust: 269 return request.app.model_response_cache 270 271 # Queue the remaining DICOMs for background processing 272 for dicom, dicom_id in dicoms[:-1]: 273 await dicom_processing_queue.put( 274 (dicom_id, dicom, config, live_savedir) 275 ) 276 277 return request.app.model_response_cache 278 279 except NoDicomFoundError as e: 280 # raise a 422 error 281 raise HTTPException(status_code=422, detail=str(e)) 282 283 except HTTPException as http_exc: 284 # Allow expected HTTP exceptions to pass through without wrapping in a 500 285 raise http_exc 286 287 except Exception as e: 288 hippa_logger.error( 289 f"Error in processing {instance_id} with keyphrase {keyphrase} " 290 f"from host: {request.client.host}." 291 ) 292 hippa_logger.error(traceback.format_exc()) 293 raise HTTPException( 294 status_code=500, detail="Internal Server Error, check logs." 295 )
Analyze a file with a Retuve Model based on the keyphrase provided.
Parameters
- request: The request object.
- keyphrase: The keyphrase to be used for analysis.
Raises
- HTTPException: If the file or keyphrase is invalid.