retuve.app.utils
1import asyncio 2import hashlib 3import json 4import os 5import secrets 6from datetime import datetime 7 8from fastapi import ( 9 APIRouter, 10 Depends, 11 Header, 12 HTTPException, 13 Request, 14 Response, 15) 16from httpx import AsyncClient 17 18from retuve.funcs import retuve_run 19 20 21def generate_token(): 22 return secrets.token_urlsafe(32) 23 24 25def consistent_hash(value, mod): 26 # Convert the input to a string and encode it 27 encoded_value = str(value).encode() 28 # Use a reproducible hash function (e.g., SHA-256) 29 hash_value = int(hashlib.sha256(encoded_value).hexdigest(), 16) 30 return hash_value % mod 31 32 33TOKEN_STORE = {} 34API_TOKEN_STORE = {} 35 36 37async def save_dicom_and_get_results( 38 live_batchdir, instance_id, dicom_content, config 39): 40 # Save the DICOM file in the appropriate directory 41 dicom_path = f"{live_batchdir}/{instance_id}.dcm" 42 if dicom_content is not None: 43 # if path already exists, return 44 if os.path.exists(dicom_path): 45 return 46 with open(dicom_path, "wb") as f: 47 f.write(dicom_content) 48 49 # Process the DICOM 50 result = await asyncio.to_thread( 51 retuve_run, 52 hip_mode=config.batch.hip_mode, 53 config=config, 54 modes_func=config.batch.mode_func, 55 modes_func_kwargs_dict={}, 56 file=dicom_path, 57 ) 58 59 return result 60 61 62async def save_results(instance_id, savedir, result=None, just_paths=False): 63 """ 64 Saves DICOM content, video, image, and metrics results for a given instance ID. 65 66 :param instance_id: The unique ID of the DICOM instance. 67 :param result: The result object returned by `retuve_run`. 68 :param savedir: The base directory for saving the results. 69 :param just_paths: Whether to return just the paths of the saved results. 70 """ 71 result_dir = f"{savedir}/{instance_id}" 72 os.makedirs(result_dir, exist_ok=True) 73 74 video_path = f"{result_dir}/video.mp4" 75 img_path = f"{result_dir}/img.jpg" 76 metrics_path = f"{result_dir}/metrics.json" 77 78 if just_paths: 79 return video_path, img_path, metrics_path 80 81 # Save video result if available 82 if result.video_clip: 83 await asyncio.to_thread(result.video_clip.write_videofile, video_path) 84 85 # Save image result if available 86 if result.image: 87 await asyncio.to_thread(result.image.save, img_path) 88 89 # Save metrics if available 90 if result.metrics: 91 with open(metrics_path, "w") as f: 92 json.dump(result.metrics, f) 93 94 return video_path, img_path, metrics_path 95 96 97async def get_sorted_dicom_images( 98 orthanc_url, username=None, password=None, latest_time=None 99): 100 """ 101 Fetch and sort DICOM images from an Orthanc server based on acquisition time. 102 103 :param orthanc_url: The Orthanc server URL. 104 :param username: Username for authentication (optional). 105 :param password: Password for authentication (optional). 106 :param latest_time: Filter for acquisition times after this datetime. 107 :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime). 108 """ 109 110 auth = (username, password) if username and password else None 111 images_with_dates = [] 112 latest_acq_time = latest_time or datetime.min 113 114 async with AsyncClient() as client: 115 patients_response = await client.get( 116 f"{orthanc_url}/patients", auth=auth 117 ) 118 for patient_id in patients_response.json(): 119 studies_response = await client.get( 120 f"{orthanc_url}/patients/{patient_id}/studies", auth=auth 121 ) 122 for study in studies_response.json(): 123 study_id = study["ID"] 124 series_response = await client.get( 125 f"{orthanc_url}/studies/{study_id}/series", auth=auth 126 ) 127 for series in series_response.json(): 128 series_id = series["ID"] 129 instances_response = await client.get( 130 f"{orthanc_url}/series/{series_id}/instances", 131 auth=auth, 132 ) 133 for instance in instances_response.json(): 134 instance_id = instance["ID"] 135 metadata_response = await client.get( 136 f"{orthanc_url}/instances/{instance_id}/simplified-tags", 137 auth=auth, 138 ) 139 metadata = metadata_response.json() 140 141 # Remove files that are not multiframe US's 142 if not ( 143 metadata.get("SOPClassUID") 144 == "1.2.840.10008.5.1.4.1.1.3.1" 145 and int(metadata.get("NumberOfFrames", 0)) > 1 146 ): 147 continue 148 149 acq_date, acq_time = metadata.get( 150 "AcquisitionDate" 151 ), metadata.get("AcquisitionTime") 152 if acq_date and acq_time: 153 acq_datetime = datetime.strptime( 154 f"{acq_date}{acq_time.split('.')[0]}", 155 "%Y%m%d%H%M%S", 156 ) 157 158 images_with_dates.append( 159 (acq_datetime, instance_id, metadata) 160 ) 161 162 final_images_with_dates = [] 163 164 if len(images_with_dates) == 0: 165 return [], latest_acq_time 166 167 for acq_datetime, instance_id, metadata in images_with_dates: 168 if acq_datetime > latest_acq_time: 169 file_response = await client.get( 170 f"{orthanc_url}/instances/{instance_id}/file", 171 auth=auth, 172 ) 173 174 # hash the main instance_id to be smaller 175 small_id = consistent_hash(instance_id, 10**8) 176 patient_id_real = metadata.get("PatientID") 177 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 178 179 latest_acq_time = max(latest_acq_time, acq_datetime) 180 181 final_images_with_dates.append( 182 (acq_datetime, file_response.content, instance_id) 183 ) 184 185 images_with_dates.sort(key=lambda x: x[0]) 186 latest_image = images_with_dates[-1] 187 188 # inject file_response.content into latest_image 189 file_response = await client.get( 190 f"{orthanc_url}/instances/{latest_image[1]}/file", 191 auth=auth, 192 ) 193 194 instance_id = latest_image[1] 195 acq_datetime = latest_image[0] 196 metadata = latest_image[2] 197 small_id = consistent_hash(instance_id, 10**8) 198 patient_id_real = metadata.get("PatientID") 199 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 200 201 latest_image = ( 202 acq_datetime, 203 file_response.content, 204 instance_id, 205 ) 206 207 if not final_images_with_dates: 208 final_images_with_dates = [latest_image] 209 210 return [ 211 (image, instance_id) 212 for _, image, instance_id in final_images_with_dates 213 ], latest_acq_time 214 215 216class UnauthorizedException(Exception): 217 """Raised when the user is unauthorized and should be redirected.""" 218 219 pass 220 221 222def validate_auth_token(auth_token: str): 223 token_data = TOKEN_STORE.get(auth_token) 224 if not token_data or token_data["expires"] < datetime.utcnow(): 225 raise UnauthorizedException 226 return token_data["username"] 227 228 229def validate_api_token(api_token: str): 230 token_data = API_TOKEN_STORE.get(api_token) 231 if not token_data or token_data["expires"] < datetime.utcnow(): 232 raise HTTPException(status_code=403, detail="Invalid API token.") 233 return token_data["username"]
def
generate_token():
def
consistent_hash(value, mod):
TOKEN_STORE =
{}
API_TOKEN_STORE =
{}
async def
save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
38async def save_dicom_and_get_results( 39 live_batchdir, instance_id, dicom_content, config 40): 41 # Save the DICOM file in the appropriate directory 42 dicom_path = f"{live_batchdir}/{instance_id}.dcm" 43 if dicom_content is not None: 44 # if path already exists, return 45 if os.path.exists(dicom_path): 46 return 47 with open(dicom_path, "wb") as f: 48 f.write(dicom_content) 49 50 # Process the DICOM 51 result = await asyncio.to_thread( 52 retuve_run, 53 hip_mode=config.batch.hip_mode, 54 config=config, 55 modes_func=config.batch.mode_func, 56 modes_func_kwargs_dict={}, 57 file=dicom_path, 58 ) 59 60 return result
async def
save_results(instance_id, savedir, result=None, just_paths=False):
63async def save_results(instance_id, savedir, result=None, just_paths=False): 64 """ 65 Saves DICOM content, video, image, and metrics results for a given instance ID. 66 67 :param instance_id: The unique ID of the DICOM instance. 68 :param result: The result object returned by `retuve_run`. 69 :param savedir: The base directory for saving the results. 70 :param just_paths: Whether to return just the paths of the saved results. 71 """ 72 result_dir = f"{savedir}/{instance_id}" 73 os.makedirs(result_dir, exist_ok=True) 74 75 video_path = f"{result_dir}/video.mp4" 76 img_path = f"{result_dir}/img.jpg" 77 metrics_path = f"{result_dir}/metrics.json" 78 79 if just_paths: 80 return video_path, img_path, metrics_path 81 82 # Save video result if available 83 if result.video_clip: 84 await asyncio.to_thread(result.video_clip.write_videofile, video_path) 85 86 # Save image result if available 87 if result.image: 88 await asyncio.to_thread(result.image.save, img_path) 89 90 # Save metrics if available 91 if result.metrics: 92 with open(metrics_path, "w") as f: 93 json.dump(result.metrics, f) 94 95 return video_path, img_path, metrics_path
Saves DICOM content, video, image, and metrics results for a given instance ID.
Parameters
- instance_id: The unique ID of the DICOM instance.
- result: The result object returned by
retuve_run
. - savedir: The base directory for saving the results.
- just_paths: Whether to return just the paths of the saved results.
async def
get_sorted_dicom_images(orthanc_url, username=None, password=None, latest_time=None):
98async def get_sorted_dicom_images( 99 orthanc_url, username=None, password=None, latest_time=None 100): 101 """ 102 Fetch and sort DICOM images from an Orthanc server based on acquisition time. 103 104 :param orthanc_url: The Orthanc server URL. 105 :param username: Username for authentication (optional). 106 :param password: Password for authentication (optional). 107 :param latest_time: Filter for acquisition times after this datetime. 108 :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime). 109 """ 110 111 auth = (username, password) if username and password else None 112 images_with_dates = [] 113 latest_acq_time = latest_time or datetime.min 114 115 async with AsyncClient() as client: 116 patients_response = await client.get( 117 f"{orthanc_url}/patients", auth=auth 118 ) 119 for patient_id in patients_response.json(): 120 studies_response = await client.get( 121 f"{orthanc_url}/patients/{patient_id}/studies", auth=auth 122 ) 123 for study in studies_response.json(): 124 study_id = study["ID"] 125 series_response = await client.get( 126 f"{orthanc_url}/studies/{study_id}/series", auth=auth 127 ) 128 for series in series_response.json(): 129 series_id = series["ID"] 130 instances_response = await client.get( 131 f"{orthanc_url}/series/{series_id}/instances", 132 auth=auth, 133 ) 134 for instance in instances_response.json(): 135 instance_id = instance["ID"] 136 metadata_response = await client.get( 137 f"{orthanc_url}/instances/{instance_id}/simplified-tags", 138 auth=auth, 139 ) 140 metadata = metadata_response.json() 141 142 # Remove files that are not multiframe US's 143 if not ( 144 metadata.get("SOPClassUID") 145 == "1.2.840.10008.5.1.4.1.1.3.1" 146 and int(metadata.get("NumberOfFrames", 0)) > 1 147 ): 148 continue 149 150 acq_date, acq_time = metadata.get( 151 "AcquisitionDate" 152 ), metadata.get("AcquisitionTime") 153 if acq_date and acq_time: 154 acq_datetime = datetime.strptime( 155 f"{acq_date}{acq_time.split('.')[0]}", 156 "%Y%m%d%H%M%S", 157 ) 158 159 images_with_dates.append( 160 (acq_datetime, instance_id, metadata) 161 ) 162 163 final_images_with_dates = [] 164 165 if len(images_with_dates) == 0: 166 return [], latest_acq_time 167 168 for acq_datetime, instance_id, metadata in images_with_dates: 169 if acq_datetime > latest_acq_time: 170 file_response = await client.get( 171 f"{orthanc_url}/instances/{instance_id}/file", 172 auth=auth, 173 ) 174 175 # hash the main instance_id to be smaller 176 small_id = consistent_hash(instance_id, 10**8) 177 patient_id_real = metadata.get("PatientID") 178 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 179 180 latest_acq_time = max(latest_acq_time, acq_datetime) 181 182 final_images_with_dates.append( 183 (acq_datetime, file_response.content, instance_id) 184 ) 185 186 images_with_dates.sort(key=lambda x: x[0]) 187 latest_image = images_with_dates[-1] 188 189 # inject file_response.content into latest_image 190 file_response = await client.get( 191 f"{orthanc_url}/instances/{latest_image[1]}/file", 192 auth=auth, 193 ) 194 195 instance_id = latest_image[1] 196 acq_datetime = latest_image[0] 197 metadata = latest_image[2] 198 small_id = consistent_hash(instance_id, 10**8) 199 patient_id_real = metadata.get("PatientID") 200 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 201 202 latest_image = ( 203 acq_datetime, 204 file_response.content, 205 instance_id, 206 ) 207 208 if not final_images_with_dates: 209 final_images_with_dates = [latest_image] 210 211 return [ 212 (image, instance_id) 213 for _, image, instance_id in final_images_with_dates 214 ], latest_acq_time
Fetch and sort DICOM images from an Orthanc server based on acquisition time.
Parameters
- orthanc_url: The Orthanc server URL.
- username: Username for authentication (optional).
- password: Password for authentication (optional).
- latest_time: Filter for acquisition times after this datetime.
Returns
A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).
def
validate_auth_token(auth_token: str):
def
validate_api_token(api_token: str):