ref: 0fed741a87ccc061eff382d306fadd71acdfc57d
dir: /dnn/torch/osce/stndrd/presentation/endoscopy.py/
""" module for inspecting models during inference """
import os
import yaml
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import torch
import numpy as np
# stores entries {key : {'fid' : fid, 'fs' : fs, 'dim' : dim, 'dtype' : dtype}}
_state = dict()
_folder = 'endoscopy'
def get_gru_gates(gru, input, state):
    hidden_size = gru.hidden_size
    direct = torch.matmul(gru.weight_ih_l0, input.squeeze())
    recurrent = torch.matmul(gru.weight_hh_l0, state.squeeze())
    # reset gate
    start, stop = 0 * hidden_size, 1 * hidden_size
    reset_gate = torch.sigmoid(direct[start : stop] + gru.bias_ih_l0[start : stop] + recurrent[start : stop] + gru.bias_hh_l0[start : stop])
    # update gate
    start, stop = 1 * hidden_size, 2 * hidden_size
    update_gate = torch.sigmoid(direct[start : stop] + gru.bias_ih_l0[start : stop] + recurrent[start : stop] + gru.bias_hh_l0[start : stop])
    # new gate
    start, stop = 2 * hidden_size, 3 * hidden_size
    new_gate = torch.tanh(direct[start : stop] + gru.bias_ih_l0[start : stop] + reset_gate * (recurrent[start : stop] +  gru.bias_hh_l0[start : stop]))
    return {'reset_gate' : reset_gate, 'update_gate' : update_gate, 'new_gate' : new_gate}
def init(folder='endoscopy'):
    """ sets up output folder for endoscopy data """
    global _folder
    _folder = folder
    if not os.path.exists(folder):
        os.makedirs(folder)
    else:
        print(f"warning: endoscopy folder {folder} exists. Content may be lost or inconsistent results may occur.")
def write_data(key, data, fs):
    """ appends data to previous data written under key """
    global _state
    # convert to numpy if torch.Tensor is given
    if isinstance(data, torch.Tensor):
        data = data.detach().numpy()
    if not key in _state:
        _state[key] = {
            'fid'   : open(os.path.join(_folder, key + '.bin'), 'wb'),
            'fs'    : fs,
            'dim'   : tuple(data.shape),
            'dtype' : str(data.dtype)
        }
        with open(os.path.join(_folder, key + '.yml'), 'w') as f:
            f.write(yaml.dump({'fs' : fs, 'dim' : tuple(data.shape), 'dtype' : str(data.dtype).split('.')[-1]}))
    else:
        if _state[key]['fs'] != fs:
            raise ValueError(f"fs changed for key {key}: {_state[key]['fs']} vs. {fs}")
        if _state[key]['dtype'] != str(data.dtype):
            raise ValueError(f"dtype changed for key {key}: {_state[key]['dtype']} vs. {str(data.dtype)}")
        if _state[key]['dim'] != tuple(data.shape):
            raise ValueError(f"dim changed for key {key}: {_state[key]['dim']} vs. {tuple(data.shape)}")
    _state[key]['fid'].write(data.tobytes())
def close(folder='endoscopy'):
    """ clean up """
    for key in _state.keys():
        _state[key]['fid'].close()
def read_data(folder='endoscopy'):
    """ retrieves written data as numpy arrays """
    keys = [name[:-4] for name in os.listdir(folder) if name.endswith('.yml')]
    return_dict = dict()
    for key in keys:
        with open(os.path.join(folder, key + '.yml'), 'r') as f:
            value = yaml.load(f.read(), yaml.FullLoader)
        with open(os.path.join(folder, key + '.bin'), 'rb') as f:
            data = np.frombuffer(f.read(), dtype=value['dtype'])
        value['data'] = data.reshape((-1,) + value['dim'])
        return_dict[key] = value
    return return_dict
def get_best_reshape(shape, target_ratio=1):
    """ calculated the best 2d reshape of shape given the target ratio (rows/cols)"""
    if len(shape) > 1:
        pixel_count = 1
        for s in shape:
            pixel_count *= s
    else:
        pixel_count = shape[0]
    if pixel_count == 1:
        return (1,)
    num_columns = int((pixel_count / target_ratio)**.5)
    while (pixel_count % num_columns):
        num_columns -= 1
    num_rows = pixel_count // num_columns
    return (num_rows, num_columns)
def get_type_and_shape(shape):
    # can happen if data is one dimensional
    if len(shape) == 0:
        shape = (1,)
    # calculate pixel count
    if len(shape) > 1:
        pixel_count = 1
        for s in shape:
            pixel_count *= s
    else:
        pixel_count = shape[0]
    if pixel_count == 1:
        return 'plot', (1, )
    # stay with shape if already 2-dimensional
    if len(shape) == 2:
        if (shape[0] != pixel_count) or (shape[1] != pixel_count):
            return 'image', shape
    return 'image', get_best_reshape(shape)
def make_animation(data, filename, start_index=80, stop_index=-80, interval=20, half_signal_window_length=80):
    # determine plot setup
    num_keys = len(data.keys())
    num_rows = int((num_keys * 3/4) ** .5)
    num_cols = (num_keys + num_rows - 1) // num_rows
    fig, axs = plt.subplots(num_rows, num_cols)
    fig.set_size_inches(num_cols * 5, num_rows * 5)
    display = dict()
    fs_max = max([val['fs'] for val in data.values()])
    num_samples = max([val['data'].shape[0] for val in data.values()])
    keys = sorted(data.keys())
    # inspect data
    for i, key in enumerate(keys):
        axs[i // num_cols, i % num_cols].title.set_text(key)
        display[key] = dict()
        display[key]['type'], display[key]['shape'] = get_type_and_shape(data[key]['dim'])
        display[key]['down_factor'] = data[key]['fs'] / fs_max
    start_index = max(start_index, half_signal_window_length)
    while stop_index < 0:
        stop_index += num_samples
    stop_index = min(stop_index, num_samples - half_signal_window_length)
    # actual plotting
    frames = []
    for index in range(start_index, stop_index):
        ims = []
        for i, key in enumerate(keys):
            feature_index = int(round(index * display[key]['down_factor']))
            if display[key]['type'] == 'plot':
                ims.append(axs[i // num_cols, i % num_cols].plot(data[key]['data'][index - half_signal_window_length : index + half_signal_window_length], marker='P', markevery=[half_signal_window_length], animated=True, color='blue')[0])
            elif display[key]['type'] == 'image':
                ims.append(axs[i // num_cols, i % num_cols].imshow(data[key]['data'][index].reshape(display[key]['shape']), animated=True))
        frames.append(ims)
    ani = animation.ArtistAnimation(fig, frames, interval=interval, blit=True, repeat_delay=1000)
    if not filename.endswith('.mp4'):
        filename += '.mp4'
    ani.save(filename)