retuve.app.utils

  1import asyncio
  2import hashlib
  3import json
  4import os
  5import secrets
  6from datetime import datetime
  7
  8from fastapi import (
  9    APIRouter,
 10    Depends,
 11    Header,
 12    HTTPException,
 13    Request,
 14    Response,
 15)
 16from httpx import AsyncClient
 17
 18from retuve.funcs import retuve_run
 19
 20
 21def generate_token():
 22    return secrets.token_urlsafe(32)
 23
 24
 25def consistent_hash(value, mod):
 26    # Convert the input to a string and encode it
 27    encoded_value = str(value).encode()
 28    # Use a reproducible hash function (e.g., SHA-256)
 29    hash_value = int(hashlib.sha256(encoded_value).hexdigest(), 16)
 30    return hash_value % mod
 31
 32
 33TOKEN_STORE = {}
 34API_TOKEN_STORE = {}
 35
 36
 37async def save_dicom_and_get_results(
 38    live_batchdir, instance_id, dicom_content, config
 39):
 40    # Save the DICOM file in the appropriate directory
 41    dicom_path = f"{live_batchdir}/{instance_id}.dcm"
 42    if dicom_content is not None:
 43        # if path already exists, return
 44        if os.path.exists(dicom_path):
 45            return
 46        with open(dicom_path, "wb") as f:
 47            f.write(dicom_content)
 48
 49    # Process the DICOM
 50    result = await asyncio.to_thread(
 51        retuve_run,
 52        hip_mode=config.batch.hip_mode,
 53        config=config,
 54        modes_func=config.batch.mode_func,
 55        modes_func_kwargs_dict={},
 56        file=dicom_path,
 57    )
 58
 59    return result
 60
 61
 62async def save_results(instance_id, savedir, result=None, just_paths=False):
 63    """
 64    Saves DICOM content, video, image, and metrics results for a given instance ID.
 65
 66    :param instance_id: The unique ID of the DICOM instance.
 67    :param result: The result object returned by `retuve_run`.
 68    :param savedir: The base directory for saving the results.
 69    :param just_paths: Whether to return just the paths of the saved results.
 70    """
 71    result_dir = f"{savedir}/{instance_id}"
 72    os.makedirs(result_dir, exist_ok=True)
 73
 74    video_path = f"{result_dir}/video.mp4"
 75    img_path = f"{result_dir}/img.jpg"
 76    metrics_path = f"{result_dir}/metrics.json"
 77
 78    if just_paths:
 79        return video_path, img_path, metrics_path
 80
 81    # Save video result if available
 82    if result.video_clip:
 83        await asyncio.to_thread(result.video_clip.write_videofile, video_path)
 84
 85    # Save image result if available
 86    if result.image:
 87        await asyncio.to_thread(result.image.save, img_path)
 88
 89    # Save metrics if available
 90    if result.metrics:
 91        with open(metrics_path, "w") as f:
 92            json.dump(result.metrics, f)
 93
 94    return video_path, img_path, metrics_path
 95
 96
 97async def get_sorted_dicom_images(
 98    orthanc_url, username=None, password=None, latest_time=None
 99):
100    """
101    Fetch and sort DICOM images from an Orthanc server based on acquisition time.
102
103    :param orthanc_url: The Orthanc server URL.
104    :param username: Username for authentication (optional).
105    :param password: Password for authentication (optional).
106    :param latest_time: Filter for acquisition times after this datetime.
107    :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).
108    """
109
110    auth = (username, password) if username and password else None
111    images_with_dates = []
112    latest_acq_time = latest_time or datetime.min
113
114    async with AsyncClient() as client:
115        patients_response = await client.get(
116            f"{orthanc_url}/patients", auth=auth
117        )
118        for patient_id in patients_response.json():
119            studies_response = await client.get(
120                f"{orthanc_url}/patients/{patient_id}/studies", auth=auth
121            )
122            for study in studies_response.json():
123                study_id = study["ID"]
124                series_response = await client.get(
125                    f"{orthanc_url}/studies/{study_id}/series", auth=auth
126                )
127                for series in series_response.json():
128                    series_id = series["ID"]
129                    instances_response = await client.get(
130                        f"{orthanc_url}/series/{series_id}/instances",
131                        auth=auth,
132                    )
133                    for instance in instances_response.json():
134                        instance_id = instance["ID"]
135                        metadata_response = await client.get(
136                            f"{orthanc_url}/instances/{instance_id}/simplified-tags",
137                            auth=auth,
138                        )
139                        metadata = metadata_response.json()
140
141                        # Remove files that are not multiframe US's
142                        if not (
143                            metadata.get("SOPClassUID")
144                            == "1.2.840.10008.5.1.4.1.1.3.1"
145                            and int(metadata.get("NumberOfFrames", 0)) > 1
146                        ):
147                            continue
148
149                        acq_date, acq_time = metadata.get(
150                            "AcquisitionDate"
151                        ), metadata.get("AcquisitionTime")
152                        if acq_date and acq_time:
153                            acq_datetime = datetime.strptime(
154                                f"{acq_date}{acq_time.split('.')[0]}",
155                                "%Y%m%d%H%M%S",
156                            )
157
158                            images_with_dates.append(
159                                (acq_datetime, instance_id, metadata)
160                            )
161
162        final_images_with_dates = []
163
164        if len(images_with_dates) == 0:
165            return [], latest_acq_time
166
167        for acq_datetime, instance_id, metadata in images_with_dates:
168            if acq_datetime > latest_acq_time:
169                file_response = await client.get(
170                    f"{orthanc_url}/instances/{instance_id}/file",
171                    auth=auth,
172                )
173
174                # hash the main instance_id to be smaller
175                small_id = consistent_hash(instance_id, 10**8)
176                patient_id_real = metadata.get("PatientID")
177                instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
178
179                latest_acq_time = max(latest_acq_time, acq_datetime)
180
181                final_images_with_dates.append(
182                    (acq_datetime, file_response.content, instance_id)
183                )
184
185        images_with_dates.sort(key=lambda x: x[0])
186        latest_image = images_with_dates[-1]
187
188        # inject file_response.content into latest_image
189        file_response = await client.get(
190            f"{orthanc_url}/instances/{latest_image[1]}/file",
191            auth=auth,
192        )
193
194        instance_id = latest_image[1]
195        acq_datetime = latest_image[0]
196        metadata = latest_image[2]
197        small_id = consistent_hash(instance_id, 10**8)
198        patient_id_real = metadata.get("PatientID")
199        instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
200
201        latest_image = (
202            acq_datetime,
203            file_response.content,
204            instance_id,
205        )
206
207        if not final_images_with_dates:
208            final_images_with_dates = [latest_image]
209
210    return [
211        (image, instance_id)
212        for _, image, instance_id in final_images_with_dates
213    ], latest_acq_time
214
215
216class UnauthorizedException(Exception):
217    """Raised when the user is unauthorized and should be redirected."""
218
219    pass
220
221
222def validate_auth_token(auth_token: str):
223    token_data = TOKEN_STORE.get(auth_token)
224    if not token_data or token_data["expires"] < datetime.utcnow():
225        raise UnauthorizedException
226    return token_data["username"]
227
228
229def validate_api_token(api_token: str):
230    token_data = API_TOKEN_STORE.get(api_token)
231    if not token_data or token_data["expires"] < datetime.utcnow():
232        raise HTTPException(status_code=403, detail="Invalid API token.")
233    return token_data["username"]
def generate_token():
22def generate_token():
23    return secrets.token_urlsafe(32)
def consistent_hash(value, mod):
26def consistent_hash(value, mod):
27    # Convert the input to a string and encode it
28    encoded_value = str(value).encode()
29    # Use a reproducible hash function (e.g., SHA-256)
30    hash_value = int(hashlib.sha256(encoded_value).hexdigest(), 16)
31    return hash_value % mod
TOKEN_STORE = {}
API_TOKEN_STORE = {}
async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
38async def save_dicom_and_get_results(
39    live_batchdir, instance_id, dicom_content, config
40):
41    # Save the DICOM file in the appropriate directory
42    dicom_path = f"{live_batchdir}/{instance_id}.dcm"
43    if dicom_content is not None:
44        # if path already exists, return
45        if os.path.exists(dicom_path):
46            return
47        with open(dicom_path, "wb") as f:
48            f.write(dicom_content)
49
50    # Process the DICOM
51    result = await asyncio.to_thread(
52        retuve_run,
53        hip_mode=config.batch.hip_mode,
54        config=config,
55        modes_func=config.batch.mode_func,
56        modes_func_kwargs_dict={},
57        file=dicom_path,
58    )
59
60    return result
async def save_results(instance_id, savedir, result=None, just_paths=False):
63async def save_results(instance_id, savedir, result=None, just_paths=False):
64    """
65    Saves DICOM content, video, image, and metrics results for a given instance ID.
66
67    :param instance_id: The unique ID of the DICOM instance.
68    :param result: The result object returned by `retuve_run`.
69    :param savedir: The base directory for saving the results.
70    :param just_paths: Whether to return just the paths of the saved results.
71    """
72    result_dir = f"{savedir}/{instance_id}"
73    os.makedirs(result_dir, exist_ok=True)
74
75    video_path = f"{result_dir}/video.mp4"
76    img_path = f"{result_dir}/img.jpg"
77    metrics_path = f"{result_dir}/metrics.json"
78
79    if just_paths:
80        return video_path, img_path, metrics_path
81
82    # Save video result if available
83    if result.video_clip:
84        await asyncio.to_thread(result.video_clip.write_videofile, video_path)
85
86    # Save image result if available
87    if result.image:
88        await asyncio.to_thread(result.image.save, img_path)
89
90    # Save metrics if available
91    if result.metrics:
92        with open(metrics_path, "w") as f:
93            json.dump(result.metrics, f)
94
95    return video_path, img_path, metrics_path

Saves DICOM content, video, image, and metrics results for a given instance ID.

Parameters
  • instance_id: The unique ID of the DICOM instance.
  • result: The result object returned by retuve_run.
  • savedir: The base directory for saving the results.
  • just_paths: Whether to return just the paths of the saved results.
async def get_sorted_dicom_images(orthanc_url, username=None, password=None, latest_time=None):
 98async def get_sorted_dicom_images(
 99    orthanc_url, username=None, password=None, latest_time=None
100):
101    """
102    Fetch and sort DICOM images from an Orthanc server based on acquisition time.
103
104    :param orthanc_url: The Orthanc server URL.
105    :param username: Username for authentication (optional).
106    :param password: Password for authentication (optional).
107    :param latest_time: Filter for acquisition times after this datetime.
108    :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).
109    """
110
111    auth = (username, password) if username and password else None
112    images_with_dates = []
113    latest_acq_time = latest_time or datetime.min
114
115    async with AsyncClient() as client:
116        patients_response = await client.get(
117            f"{orthanc_url}/patients", auth=auth
118        )
119        for patient_id in patients_response.json():
120            studies_response = await client.get(
121                f"{orthanc_url}/patients/{patient_id}/studies", auth=auth
122            )
123            for study in studies_response.json():
124                study_id = study["ID"]
125                series_response = await client.get(
126                    f"{orthanc_url}/studies/{study_id}/series", auth=auth
127                )
128                for series in series_response.json():
129                    series_id = series["ID"]
130                    instances_response = await client.get(
131                        f"{orthanc_url}/series/{series_id}/instances",
132                        auth=auth,
133                    )
134                    for instance in instances_response.json():
135                        instance_id = instance["ID"]
136                        metadata_response = await client.get(
137                            f"{orthanc_url}/instances/{instance_id}/simplified-tags",
138                            auth=auth,
139                        )
140                        metadata = metadata_response.json()
141
142                        # Remove files that are not multiframe US's
143                        if not (
144                            metadata.get("SOPClassUID")
145                            == "1.2.840.10008.5.1.4.1.1.3.1"
146                            and int(metadata.get("NumberOfFrames", 0)) > 1
147                        ):
148                            continue
149
150                        acq_date, acq_time = metadata.get(
151                            "AcquisitionDate"
152                        ), metadata.get("AcquisitionTime")
153                        if acq_date and acq_time:
154                            acq_datetime = datetime.strptime(
155                                f"{acq_date}{acq_time.split('.')[0]}",
156                                "%Y%m%d%H%M%S",
157                            )
158
159                            images_with_dates.append(
160                                (acq_datetime, instance_id, metadata)
161                            )
162
163        final_images_with_dates = []
164
165        if len(images_with_dates) == 0:
166            return [], latest_acq_time
167
168        for acq_datetime, instance_id, metadata in images_with_dates:
169            if acq_datetime > latest_acq_time:
170                file_response = await client.get(
171                    f"{orthanc_url}/instances/{instance_id}/file",
172                    auth=auth,
173                )
174
175                # hash the main instance_id to be smaller
176                small_id = consistent_hash(instance_id, 10**8)
177                patient_id_real = metadata.get("PatientID")
178                instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
179
180                latest_acq_time = max(latest_acq_time, acq_datetime)
181
182                final_images_with_dates.append(
183                    (acq_datetime, file_response.content, instance_id)
184                )
185
186        images_with_dates.sort(key=lambda x: x[0])
187        latest_image = images_with_dates[-1]
188
189        # inject file_response.content into latest_image
190        file_response = await client.get(
191            f"{orthanc_url}/instances/{latest_image[1]}/file",
192            auth=auth,
193        )
194
195        instance_id = latest_image[1]
196        acq_datetime = latest_image[0]
197        metadata = latest_image[2]
198        small_id = consistent_hash(instance_id, 10**8)
199        patient_id_real = metadata.get("PatientID")
200        instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
201
202        latest_image = (
203            acq_datetime,
204            file_response.content,
205            instance_id,
206        )
207
208        if not final_images_with_dates:
209            final_images_with_dates = [latest_image]
210
211    return [
212        (image, instance_id)
213        for _, image, instance_id in final_images_with_dates
214    ], latest_acq_time

Fetch and sort DICOM images from an Orthanc server based on acquisition time.

Parameters
  • orthanc_url: The Orthanc server URL.
  • username: Username for authentication (optional).
  • password: Password for authentication (optional).
  • latest_time: Filter for acquisition times after this datetime.
Returns

A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).

class UnauthorizedException(builtins.Exception):
217class UnauthorizedException(Exception):
218    """Raised when the user is unauthorized and should be redirected."""
219
220    pass

Raised when the user is unauthorized and should be redirected.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
def validate_auth_token(auth_token: str):
223def validate_auth_token(auth_token: str):
224    token_data = TOKEN_STORE.get(auth_token)
225    if not token_data or token_data["expires"] < datetime.utcnow():
226        raise UnauthorizedException
227    return token_data["username"]
def validate_api_token(api_token: str):
230def validate_api_token(api_token: str):
231    token_data = API_TOKEN_STORE.get(api_token)
232    if not token_data or token_data["expires"] < datetime.utcnow():
233        raise HTTPException(status_code=403, detail="Invalid API token.")
234    return token_data["username"]