retuve.hip_us.multiframe.graf

High Level Functions for Multiframe Analysis

  1# Copyright 2024 Adam McArthur
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""
 16High Level Functions for Multiframe Analysis
 17"""
 18
 19import json
 20import os
 21from typing import List
 22
 23import cv2
 24import numpy as np
 25from filelock import FileLock
 26
 27from retuve.classes.seg import SegFrameObjects
 28from retuve.hip_us.classes.enums import HipLabelsUS
 29from retuve.hip_us.classes.general import HipDatasUS
 30from retuve.keyphrases.config import Config
 31from retuve.keyphrases.enums import GrafSelectionMethod, MetricUS
 32from retuve.logs import ulogger
 33
 34DO_CALIBRATION = False
 35
 36
 37def _get_left_apex_angle(hip) -> bool:
 38    """
 39    Check if the left apex line is flat.
 40
 41    :param hip: HipDataUS object.
 42
 43    :return: Boolean indicating if the left apex line is flat.
 44    """
 45    if hip.landmarks is None:
 46        return True
 47
 48    C, A, B = (
 49        np.array(hip.landmarks.left),
 50        np.array(hip.landmarks.apex),
 51        np.array((hip.landmarks.apex[0], hip.landmarks.left[1])),
 52    )
 53
 54    a = np.linalg.norm(C - B)
 55    b = np.linalg.norm(C - A)
 56
 57    angle = np.arccos((a**2 + b**2 - np.linalg.norm(A - B) ** 2) / (2 * a * b))
 58    angle = np.degrees(angle)
 59
 60    return int(angle)
 61
 62
 63def _get_os_ichium_area(seg_frame_objs) -> float:
 64    os_ichium = [
 65        seg_obj
 66        for seg_obj in seg_frame_objs
 67        if seg_obj.cls == HipLabelsUS.OsIchium
 68    ]
 69    os_ishium_area = 0
 70    # Gives a value of 0 or 2
 71    if len(os_ichium) != 0:
 72        os_ishium_area = round(os_ichium[0].area(), 1)
 73
 74    return os_ishium_area
 75
 76
 77def _get_femoral_head_area(seg_frame_objs) -> float:
 78    femoral_head = [
 79        seg_obj
 80        for seg_obj in seg_frame_objs
 81        if seg_obj.cls == HipLabelsUS.FemoralHead
 82    ]
 83    femoral_head_area = 0
 84    if len(femoral_head) != 0:
 85        femoral_head_area = round(femoral_head[0].area(), 1)
 86
 87    return femoral_head_area
 88
 89
 90def _get_apex_right_distance(hip) -> float:
 91    apex_right_distance = 0
 92    if hip.landmarks:
 93        apex_right_distance = abs(
 94            hip.landmarks.apex[1] - hip.landmarks.right[1]
 95        )
 96
 97    return apex_right_distance
 98
 99
100def _get_femoral_head_roundness(seg_frame_objs) -> float:
101    femoral_head = [
102        seg_obj
103        for seg_obj in seg_frame_objs
104        if seg_obj.cls == HipLabelsUS.FemoralHead
105    ]
106    roundness_ratio = 0
107    if len(femoral_head) != 0:
108        foreground_mask = (
109            np.all(femoral_head[0].mask == [255, 255, 255], axis=-1).astype(
110                np.uint8
111            )
112            * 255
113        )
114
115        # Step 2: Detect edges
116        edges = cv2.Canny(foreground_mask, 50, 150)
117
118        # Step 3: Fit a circle to the detected edges
119        contours, _ = cv2.findContours(
120            edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
121        )
122        if contours:
123            # Get the largest contour (assuming it's the femoral head)
124            largest_contour = max(contours, key=cv2.contourArea)
125
126            # Fit a minimum enclosing circle around the contour
127            (x, y), radius = cv2.minEnclosingCircle(largest_contour)
128            circle_area = np.pi * (radius**2)
129            contour_area = cv2.contourArea(largest_contour)
130
131            # Step 4: Calculate roundness
132            # Roundness ratio of contour area to enclosing circle area (closer to 1 is more round)
133            roundness_ratio = (
134                contour_area / circle_area if circle_area != 0 else 0
135            )
136
137    return roundness_ratio
138
139
140def graf_frame_algo(
141    hip_zipped_data,
142    max_alpha,
143    first_illium_frame,
144    last_illium_frame,
145    file_id=None,
146):
147    """
148    Get the Graf Frame for the hip US module as a weighted
149    mix of the max alpha value and flatness of the angle.
150
151    See an analysis of how these weights were found here: https://files.mcaq.me/xj3kb.png.
152
153    The code will be included directly in Retuve in the future update.
154
155    :param hip_zipped_data: The hip data and results.
156    :param max_alpha: The Max Alpha
157    :param first_illium_frame: The first illium frame.
158    :param last_illium_frame: The last illium frame.
159
160    :return: Weighted score based on alpha and flatness.
161    """
162
163    alpha_weight = 5.6  # Largest Alpha Angle
164    line_flatness_weight = 4.71  # Flatness of illium
165    os_ishium_weight = 14.51  # Os Ichium Area
166    femoral_head_weight = 12.81  # Femoral Head Area
167    apex_right_distance_weight = 0.96  # Distance between apex and right
168    femoral_head_roundness_weight = 1.42  # Roundness of femoral head
169    graf_frame_position_weight = 0.16  # Position of the frame in the illium
170
171    hip_data, seg_frame_objs = hip_zipped_data
172
173    # Gives values between 1 and 7
174    alpha_normalisation = max_alpha / 4
175    alpha_value = round(
176        hip_data.get_metric(MetricUS.ALPHA) / alpha_normalisation, 2
177    )
178
179    line_flattness_normalisation = 2
180    # Gives values varying between 0 and 10
181    line_flatness_value = (
182        10 - _get_left_apex_angle(hip_data)
183    ) / line_flattness_normalisation
184
185    # Print the image area
186    image_area = seg_frame_objs.img.shape[0] * seg_frame_objs.img.shape[1]
187
188    os_ichium_normalisation = image_area / 140
189    os_ichium_value = (
190        _get_os_ichium_area(seg_frame_objs) / os_ichium_normalisation
191    )
192
193    # do the same thing for the femoral head
194    femoral_head_normalisation = image_area / 14
195    femoral_head_value = (
196        _get_femoral_head_area(seg_frame_objs) / femoral_head_normalisation
197    )
198
199    apex_right_distance_normalisation = seg_frame_objs.img.shape[0] / 20
200    apex_right_distance_value = (
201        _get_apex_right_distance(hip_data) / apex_right_distance_normalisation
202    )
203
204    # Weigh being in the middle third of the illium frames
205    illium_third_length = (last_illium_frame - first_illium_frame) // 3
206    middle_third_range = range(
207        first_illium_frame + illium_third_length,
208        last_illium_frame - illium_third_length,
209    )
210
211    graf_frame_position_value = 0
212    if hip_data.frame_no in middle_third_range:
213        graf_frame_position_value = 3
214
215    femoral_head_roundness_normalisation = 2
216    femoral_head_roundness_value = (
217        _get_femoral_head_roundness(seg_frame_objs)
218        * femoral_head_roundness_normalisation
219    )
220
221    # Calculate the weighted score based on alpha and angle flatness
222    final_score = round(
223        (
224            alpha_weight * alpha_value
225            + line_flatness_weight * line_flatness_value
226            + os_ishium_weight * os_ichium_value
227            + femoral_head_weight * femoral_head_value
228            + apex_right_distance_weight * apex_right_distance_value
229            + graf_frame_position_weight * graf_frame_position_value
230            + femoral_head_roundness_weight * femoral_head_roundness_value
231        ),
232        2,
233    )
234
235    if file_id and DO_CALIBRATION:
236        data = {
237            "alpha_value": alpha_value,
238            "line_flatness_value": line_flatness_value,
239            "os_ichium_value": os_ichium_value,
240            "femoral_head_value": femoral_head_value,
241            "apex_right_distance_value": apex_right_distance_value,
242            "graf_frame_position_value": graf_frame_position_value,
243            "femoral_head_roundness_value": femoral_head_roundness_value,
244        }
245
246        # Create folder for the JSON file
247        json_folder = f"./scripts/val/cali/"
248        os.makedirs(json_folder, exist_ok=True)
249
250        # Define the path to the JSON file and the lock file
251        json_file_path = f"{json_folder}/{file_id.replace('.dcm', '.json')}"
252        lock_file_path = f"{json_folder}/{file_id.replace('.dcm', '.lock')}"
253
254        # Use a file lock to prevent concurrent access issues
255        lock = FileLock(lock_file_path)
256
257        # Acquire the lock before reading or writing
258        with lock:
259            # If the JSON file already exists, load its contents; otherwise, start with an empty dictionary
260            if os.path.exists(json_file_path):
261                with open(json_file_path, "r") as f:
262                    hip_data_dict = json.load(f)
263            else:
264                hip_data_dict = {}
265
266            # Add or update the data for this frame_no
267            hip_data_dict[hip_data.frame_no] = data
268
269            # Save the updated data back to the JSON file
270            with open(json_file_path, "w") as f:
271                json.dump(hip_data_dict, f, indent=4)
272
273    return final_score
274
275
276def find_graf_plane(
277    hip_datas: HipDatasUS, results: List[SegFrameObjects], config: Config
278) -> HipDatasUS:
279    """
280    Find the Graf Plane for the hip US module.
281
282    :param hip_datas: The hip data.
283    :param results: The results of the segmentation.
284    :param config: The configuration.
285
286    :return: The hip data with the Graf Plane.
287    """
288
289    hip_datas.graf_confs = []
290
291    if config.hip.graf_selection_method == GrafSelectionMethod.MANUAL_FEATURES:
292        return find_graf_plane_manual_features(hip_datas, results, config)
293
294    elif config.hip.graf_selection_method == GrafSelectionMethod.OBJ_CLASSIFY:
295        raise NotImplementedError(
296            f"Unsupported Graf Selection Method: {config.hip.graf_selection_method}"
297        )
298
299    else:
300        raise NotImplementedError(
301            f"Unsupported Graf Selection Method: {config.hip.graf_selection_method}"
302        )
303
304
305def find_graf_plane_manual_features(
306    hip_datas: HipDatasUS, results: List[SegFrameObjects], config: Config
307) -> HipDatasUS:
308    """
309    Find the Graf Plane for the hip US module.
310
311    :param hip_datas: The hip data.
312    :param results: The results of the segmentation.
313
314    :return: The hip data with the Graf Plane.
315    """
316
317    if config.hip.graf_algo_threshold:
318        GRAF_THRESHOLD = config.hip.graf_algo_threshold
319    else:
320        # NOTE(adamcarthur) - this is so that previous behavior is maintained
321        GRAF_THRESHOLD = 1
322
323    any_good_graf_data = [
324        (hip_data, seg_frame_objs)
325        for hip_data, seg_frame_objs in zip(hip_datas, results)
326        if hip_data.metrics
327        and all(metric.value != 0 for metric in hip_data.metrics)
328    ]
329
330    if len(any_good_graf_data) == 0:
331        ulogger.warning("No good graf frames found")
332        hip_datas.recorded_error.append("No Perfect Grafs Frames found.")
333        hip_datas.recorded_error.critical = True
334        any_good_graf_frames = hip_datas.hip_datas
335        any_good_graf_data = zip(any_good_graf_frames, results)
336    else:
337        any_good_graf_frames, _ = zip(*any_good_graf_data)
338
339    # get max alpha frame
340    max_alpha = max(
341        any_good_graf_frames,
342        key=lambda hip_data: hip_data.get_metric(MetricUS.ALPHA),
343    ).get_metric(MetricUS.ALPHA)
344
345    all_illiums = [
346        hip_data
347        for hip_data in hip_datas
348        if hip_data.get_metric(MetricUS.ALPHA) != 0
349    ]
350
351    if len(all_illiums) > 0:
352        hip_datas.graf_confs = [
353            graf_frame_algo(
354                (hip_data, seg_frame_objs),
355                max_alpha,
356                all_illiums[0].frame_no,
357                all_illiums[-1].frame_no,
358                None,
359            )
360            / GRAF_THRESHOLD
361            for hip_data, seg_frame_objs in zip(hip_datas, results)
362        ]
363
364    if len(all_illiums) != 0:
365        first_illium_frame = all_illiums[0].frame_no
366        last_illium_frame = all_illiums[-1].frame_no
367
368    if max_alpha == 0:
369        hip_datas.recorded_error.append("Max Alpha is 0.")
370        hip_datas.recorded_error.critical = True
371        return hip_datas
372
373    if not hasattr(hip_datas, "file_id"):
374        hip_datas.file_id = None
375
376    graf_hip, _ = max(
377        any_good_graf_data,
378        key=lambda hip_zipped_data: graf_frame_algo(
379            hip_zipped_data,
380            max_alpha,
381            first_illium_frame,
382            last_illium_frame,
383            hip_datas.file_id,
384        ),
385    )
386
387    # pick the index closest to the center
388    center = len(hip_datas.hip_datas) // 2
389    graf_frame = min(
390        [graf_hip.frame_no],
391        key=lambda index: abs(index - center),
392    )
393
394    best_conf = hip_datas.graf_confs[graf_frame]
395    if best_conf < 1:
396        ulogger.warning("No high-quality Graf Frames found")
397        hip_datas.recorded_error.append("No High-Quality Graf Frames found.")
398        hip_datas.recorded_error.critical = True
399        return hip_datas
400
401    hip_datas.graf_frame = graf_frame
402
403    hip_datas.grafs_hip = [
404        hip for hip in hip_datas if hip.frame_no == graf_frame
405    ][0]
406
407    return hip_datas
DO_CALIBRATION = False
def graf_frame_algo( hip_zipped_data, max_alpha, first_illium_frame, last_illium_frame, file_id=None):
141def graf_frame_algo(
142    hip_zipped_data,
143    max_alpha,
144    first_illium_frame,
145    last_illium_frame,
146    file_id=None,
147):
148    """
149    Get the Graf Frame for the hip US module as a weighted
150    mix of the max alpha value and flatness of the angle.
151
152    See an analysis of how these weights were found here: https://files.mcaq.me/xj3kb.png.
153
154    The code will be included directly in Retuve in the future update.
155
156    :param hip_zipped_data: The hip data and results.
157    :param max_alpha: The Max Alpha
158    :param first_illium_frame: The first illium frame.
159    :param last_illium_frame: The last illium frame.
160
161    :return: Weighted score based on alpha and flatness.
162    """
163
164    alpha_weight = 5.6  # Largest Alpha Angle
165    line_flatness_weight = 4.71  # Flatness of illium
166    os_ishium_weight = 14.51  # Os Ichium Area
167    femoral_head_weight = 12.81  # Femoral Head Area
168    apex_right_distance_weight = 0.96  # Distance between apex and right
169    femoral_head_roundness_weight = 1.42  # Roundness of femoral head
170    graf_frame_position_weight = 0.16  # Position of the frame in the illium
171
172    hip_data, seg_frame_objs = hip_zipped_data
173
174    # Gives values between 1 and 7
175    alpha_normalisation = max_alpha / 4
176    alpha_value = round(
177        hip_data.get_metric(MetricUS.ALPHA) / alpha_normalisation, 2
178    )
179
180    line_flattness_normalisation = 2
181    # Gives values varying between 0 and 10
182    line_flatness_value = (
183        10 - _get_left_apex_angle(hip_data)
184    ) / line_flattness_normalisation
185
186    # Print the image area
187    image_area = seg_frame_objs.img.shape[0] * seg_frame_objs.img.shape[1]
188
189    os_ichium_normalisation = image_area / 140
190    os_ichium_value = (
191        _get_os_ichium_area(seg_frame_objs) / os_ichium_normalisation
192    )
193
194    # do the same thing for the femoral head
195    femoral_head_normalisation = image_area / 14
196    femoral_head_value = (
197        _get_femoral_head_area(seg_frame_objs) / femoral_head_normalisation
198    )
199
200    apex_right_distance_normalisation = seg_frame_objs.img.shape[0] / 20
201    apex_right_distance_value = (
202        _get_apex_right_distance(hip_data) / apex_right_distance_normalisation
203    )
204
205    # Weigh being in the middle third of the illium frames
206    illium_third_length = (last_illium_frame - first_illium_frame) // 3
207    middle_third_range = range(
208        first_illium_frame + illium_third_length,
209        last_illium_frame - illium_third_length,
210    )
211
212    graf_frame_position_value = 0
213    if hip_data.frame_no in middle_third_range:
214        graf_frame_position_value = 3
215
216    femoral_head_roundness_normalisation = 2
217    femoral_head_roundness_value = (
218        _get_femoral_head_roundness(seg_frame_objs)
219        * femoral_head_roundness_normalisation
220    )
221
222    # Calculate the weighted score based on alpha and angle flatness
223    final_score = round(
224        (
225            alpha_weight * alpha_value
226            + line_flatness_weight * line_flatness_value
227            + os_ishium_weight * os_ichium_value
228            + femoral_head_weight * femoral_head_value
229            + apex_right_distance_weight * apex_right_distance_value
230            + graf_frame_position_weight * graf_frame_position_value
231            + femoral_head_roundness_weight * femoral_head_roundness_value
232        ),
233        2,
234    )
235
236    if file_id and DO_CALIBRATION:
237        data = {
238            "alpha_value": alpha_value,
239            "line_flatness_value": line_flatness_value,
240            "os_ichium_value": os_ichium_value,
241            "femoral_head_value": femoral_head_value,
242            "apex_right_distance_value": apex_right_distance_value,
243            "graf_frame_position_value": graf_frame_position_value,
244            "femoral_head_roundness_value": femoral_head_roundness_value,
245        }
246
247        # Create folder for the JSON file
248        json_folder = f"./scripts/val/cali/"
249        os.makedirs(json_folder, exist_ok=True)
250
251        # Define the path to the JSON file and the lock file
252        json_file_path = f"{json_folder}/{file_id.replace('.dcm', '.json')}"
253        lock_file_path = f"{json_folder}/{file_id.replace('.dcm', '.lock')}"
254
255        # Use a file lock to prevent concurrent access issues
256        lock = FileLock(lock_file_path)
257
258        # Acquire the lock before reading or writing
259        with lock:
260            # If the JSON file already exists, load its contents; otherwise, start with an empty dictionary
261            if os.path.exists(json_file_path):
262                with open(json_file_path, "r") as f:
263                    hip_data_dict = json.load(f)
264            else:
265                hip_data_dict = {}
266
267            # Add or update the data for this frame_no
268            hip_data_dict[hip_data.frame_no] = data
269
270            # Save the updated data back to the JSON file
271            with open(json_file_path, "w") as f:
272                json.dump(hip_data_dict, f, indent=4)
273
274    return final_score

Get the Graf Frame for the hip US module as a weighted mix of the max alpha value and flatness of the angle.

See an analysis of how these weights were found here: https://files.mcaq.me/xj3kb.png.

The code will be included directly in Retuve in the future update.

Parameters
  • hip_zipped_data: The hip data and results.
  • max_alpha: The Max Alpha
  • first_illium_frame: The first illium frame.
  • last_illium_frame: The last illium frame.
Returns

Weighted score based on alpha and flatness.

277def find_graf_plane(
278    hip_datas: HipDatasUS, results: List[SegFrameObjects], config: Config
279) -> HipDatasUS:
280    """
281    Find the Graf Plane for the hip US module.
282
283    :param hip_datas: The hip data.
284    :param results: The results of the segmentation.
285    :param config: The configuration.
286
287    :return: The hip data with the Graf Plane.
288    """
289
290    hip_datas.graf_confs = []
291
292    if config.hip.graf_selection_method == GrafSelectionMethod.MANUAL_FEATURES:
293        return find_graf_plane_manual_features(hip_datas, results, config)
294
295    elif config.hip.graf_selection_method == GrafSelectionMethod.OBJ_CLASSIFY:
296        raise NotImplementedError(
297            f"Unsupported Graf Selection Method: {config.hip.graf_selection_method}"
298        )
299
300    else:
301        raise NotImplementedError(
302            f"Unsupported Graf Selection Method: {config.hip.graf_selection_method}"
303        )

Find the Graf Plane for the hip US module.

Parameters
  • hip_datas: The hip data.
  • results: The results of the segmentation.
  • config: The configuration.
Returns

The hip data with the Graf Plane.

306def find_graf_plane_manual_features(
307    hip_datas: HipDatasUS, results: List[SegFrameObjects], config: Config
308) -> HipDatasUS:
309    """
310    Find the Graf Plane for the hip US module.
311
312    :param hip_datas: The hip data.
313    :param results: The results of the segmentation.
314
315    :return: The hip data with the Graf Plane.
316    """
317
318    if config.hip.graf_algo_threshold:
319        GRAF_THRESHOLD = config.hip.graf_algo_threshold
320    else:
321        # NOTE(adamcarthur) - this is so that previous behavior is maintained
322        GRAF_THRESHOLD = 1
323
324    any_good_graf_data = [
325        (hip_data, seg_frame_objs)
326        for hip_data, seg_frame_objs in zip(hip_datas, results)
327        if hip_data.metrics
328        and all(metric.value != 0 for metric in hip_data.metrics)
329    ]
330
331    if len(any_good_graf_data) == 0:
332        ulogger.warning("No good graf frames found")
333        hip_datas.recorded_error.append("No Perfect Grafs Frames found.")
334        hip_datas.recorded_error.critical = True
335        any_good_graf_frames = hip_datas.hip_datas
336        any_good_graf_data = zip(any_good_graf_frames, results)
337    else:
338        any_good_graf_frames, _ = zip(*any_good_graf_data)
339
340    # get max alpha frame
341    max_alpha = max(
342        any_good_graf_frames,
343        key=lambda hip_data: hip_data.get_metric(MetricUS.ALPHA),
344    ).get_metric(MetricUS.ALPHA)
345
346    all_illiums = [
347        hip_data
348        for hip_data in hip_datas
349        if hip_data.get_metric(MetricUS.ALPHA) != 0
350    ]
351
352    if len(all_illiums) > 0:
353        hip_datas.graf_confs = [
354            graf_frame_algo(
355                (hip_data, seg_frame_objs),
356                max_alpha,
357                all_illiums[0].frame_no,
358                all_illiums[-1].frame_no,
359                None,
360            )
361            / GRAF_THRESHOLD
362            for hip_data, seg_frame_objs in zip(hip_datas, results)
363        ]
364
365    if len(all_illiums) != 0:
366        first_illium_frame = all_illiums[0].frame_no
367        last_illium_frame = all_illiums[-1].frame_no
368
369    if max_alpha == 0:
370        hip_datas.recorded_error.append("Max Alpha is 0.")
371        hip_datas.recorded_error.critical = True
372        return hip_datas
373
374    if not hasattr(hip_datas, "file_id"):
375        hip_datas.file_id = None
376
377    graf_hip, _ = max(
378        any_good_graf_data,
379        key=lambda hip_zipped_data: graf_frame_algo(
380            hip_zipped_data,
381            max_alpha,
382            first_illium_frame,
383            last_illium_frame,
384            hip_datas.file_id,
385        ),
386    )
387
388    # pick the index closest to the center
389    center = len(hip_datas.hip_datas) // 2
390    graf_frame = min(
391        [graf_hip.frame_no],
392        key=lambda index: abs(index - center),
393    )
394
395    best_conf = hip_datas.graf_confs[graf_frame]
396    if best_conf < 1:
397        ulogger.warning("No high-quality Graf Frames found")
398        hip_datas.recorded_error.append("No High-Quality Graf Frames found.")
399        hip_datas.recorded_error.critical = True
400        return hip_datas
401
402    hip_datas.graf_frame = graf_frame
403
404    hip_datas.grafs_hip = [
405        hip for hip in hip_datas if hip.frame_no == graf_frame
406    ][0]
407
408    return hip_datas

Find the Graf Plane for the hip US module.

Parameters
  • hip_datas: The hip data.
  • results: The results of the segmentation.
Returns

The hip data with the Graf Plane.