shithub: opus

ref: 60f151b87d4a2a4e14eff2c7208c96f8618e6f22
dir: /dnn/torch/osce/utils/endoscopy.py/

View raw version
""" module for inspecting models during inference """

import os

import yaml
import matplotlib.pyplot as plt
import matplotlib.animation as animation

import torch
import numpy as np

# stores entries {key : {'fid' : fid, 'fs' : fs, 'dim' : dim, 'dtype' : dtype}}
_state = dict()
_folder = 'endoscopy'

def get_gru_gates(gru, input, state):
    hidden_size = gru.hidden_size

    direct = torch.matmul(gru.weight_ih_l0, input.squeeze())
    recurrent = torch.matmul(gru.weight_hh_l0, state.squeeze())

    # reset gate
    start, stop = 0 * hidden_size, 1 * hidden_size
    reset_gate = torch.sigmoid(direct[start : stop] + gru.bias_ih_l0[start : stop] + recurrent[start : stop] + gru.bias_hh_l0[start : stop])

    # update gate
    start, stop = 1 * hidden_size, 2 * hidden_size
    update_gate = torch.sigmoid(direct[start : stop] + gru.bias_ih_l0[start : stop] + recurrent[start : stop] + gru.bias_hh_l0[start : stop])

    # new gate
    start, stop = 2 * hidden_size, 3 * hidden_size
    new_gate = torch.tanh(direct[start : stop] + gru.bias_ih_l0[start : stop] + reset_gate * (recurrent[start : stop] +  gru.bias_hh_l0[start : stop]))

    return {'reset_gate' : reset_gate, 'update_gate' : update_gate, 'new_gate' : new_gate}


def init(folder='endoscopy'):
    """ sets up output folder for endoscopy data """

    global _folder
    _folder = folder

    if not os.path.exists(folder):
        os.makedirs(folder)
    else:
        print(f"warning: endoscopy folder {folder} exists. Content may be lost or inconsistent results may occur.")

def write_data(key, data, fs):
    """ appends data to previous data written under key """

    global _state

    # convert to numpy if torch.Tensor is given
    if isinstance(data, torch.Tensor):
        data = data.detach().numpy()

    if not key in _state:
        _state[key] = {
            'fid'   : open(os.path.join(_folder, key + '.bin'), 'wb'),
            'fs'    : fs,
            'dim'   : tuple(data.shape),
            'dtype' : str(data.dtype)
        }

        with open(os.path.join(_folder, key + '.yml'), 'w') as f:
            f.write(yaml.dump({'fs' : fs, 'dim' : tuple(data.shape), 'dtype' : str(data.dtype).split('.')[-1]}))
    else:
        if _state[key]['fs'] != fs:
            raise ValueError(f"fs changed for key {key}: {_state[key]['fs']} vs. {fs}")
        if _state[key]['dtype'] != str(data.dtype):
            raise ValueError(f"dtype changed for key {key}: {_state[key]['dtype']} vs. {str(data.dtype)}")
        if _state[key]['dim'] != tuple(data.shape):
            raise ValueError(f"dim changed for key {key}: {_state[key]['dim']} vs. {tuple(data.shape)}")

    _state[key]['fid'].write(data.tobytes())

def close(folder='endoscopy'):
    """ clean up """
    for key in _state.keys():
        _state[key]['fid'].close()


def read_data(folder='endoscopy'):
    """ retrieves written data as numpy arrays """


    keys = [name[:-4] for name in os.listdir(folder) if name.endswith('.yml')]

    return_dict = dict()

    for key in keys:
        with open(os.path.join(folder, key + '.yml'), 'r') as f:
            value = yaml.load(f.read(), yaml.FullLoader)

        with open(os.path.join(folder, key + '.bin'), 'rb') as f:
            data = np.frombuffer(f.read(), dtype=value['dtype'])

        value['data'] = data.reshape((-1,) + value['dim'])

        return_dict[key] = value

    return return_dict

def get_best_reshape(shape, target_ratio=1):
    """ calculated the best 2d reshape of shape given the target ratio (rows/cols)"""

    if len(shape) > 1:
        pixel_count = 1
        for s in shape:
            pixel_count *= s
    else:
        pixel_count = shape[0]

    if pixel_count == 1:
        return (1,)

    num_columns = int((pixel_count / target_ratio)**.5)

    while (pixel_count % num_columns):
        num_columns -= 1

    num_rows = pixel_count // num_columns

    return (num_rows, num_columns)

def get_type_and_shape(shape):

    # can happen if data is one dimensional
    if len(shape) == 0:
        shape = (1,)

    # calculate pixel count
    if len(shape) > 1:
        pixel_count = 1
        for s in shape:
            pixel_count *= s
    else:
        pixel_count = shape[0]

    if pixel_count == 1:
        return 'plot', (1, )

    # stay with shape if already 2-dimensional
    if len(shape) == 2:
        if (shape[0] != pixel_count) or (shape[1] != pixel_count):
            return 'image', shape

    return 'image', get_best_reshape(shape)

def make_animation(data, filename, start_index=80, stop_index=-80, interval=20, half_signal_window_length=80):

    # determine plot setup
    num_keys = len(data.keys())

    num_rows = int((num_keys * 3/4) ** .5)

    num_cols = (num_keys + num_rows - 1) // num_rows

    fig, axs = plt.subplots(num_rows, num_cols)
    fig.set_size_inches(num_cols * 5, num_rows * 5)

    display = dict()

    fs_max = max([val['fs'] for val in data.values()])

    num_samples = max([val['data'].shape[0] for val in data.values()])

    keys = sorted(data.keys())

    # inspect data
    for i, key in enumerate(keys):
        axs[i // num_cols, i % num_cols].title.set_text(key)

        display[key] = dict()

        display[key]['type'], display[key]['shape'] = get_type_and_shape(data[key]['dim'])
        display[key]['down_factor'] = data[key]['fs'] / fs_max

    start_index = max(start_index, half_signal_window_length)
    while stop_index < 0:
        stop_index += num_samples

    stop_index = min(stop_index, num_samples - half_signal_window_length)

    # actual plotting
    frames = []
    for index in range(start_index, stop_index):
        ims = []
        for i, key in enumerate(keys):
            feature_index = int(round(index * display[key]['down_factor']))

            if display[key]['type'] == 'plot':
                ims.append(axs[i // num_cols, i % num_cols].plot(data[key]['data'][index - half_signal_window_length : index + half_signal_window_length], marker='P', markevery=[half_signal_window_length], animated=True, color='blue')[0])

            elif display[key]['type'] == 'image':
                ims.append(axs[i // num_cols, i % num_cols].imshow(data[key]['data'][index].reshape(display[key]['shape']), animated=True))

        frames.append(ims)

    ani = animation.ArtistAnimation(fig, frames, interval=interval, blit=True, repeat_delay=1000)

    if not filename.endswith('.mp4'):
        filename += '.mp4'

    ani.save(filename)