MRI图像预处理

时间:2024-05-23 12:25:55

1. 校正偏差域

诸如扫描仪中的患者位置,扫描仪本身以及许多未知问题等因素可导致MR图像上的亮度差异。 换句话说,强度值(从黑色到白色)可以在同一组织内变化。 这被称为偏置场。 这是一种低频平滑的不良信号,会破坏MR图像。 偏置场导致MRI机器的磁场中的不均匀性。 如果未校正偏置字段将导致所有成像处理算法(例如,分段(例如,Freesurfer)和分类)输出不正确的结果。 在进行分割或分类之前,需要预处理步骤来校正偏置场的影响。
如下图所示:

MRI图像预处理

使用N4BiasFieldCorrection可以校正偏差域

 案例展示

MRI图像预处理

使用N4偏置场校正(B)和基于高斯滤波器的方法(C)对原始图像(A)进行不均匀性校正的示例。 底行显示了它们的强度直方图。 白色箭头显示热点的中心。 黑色箭头显示WM信号(左)和GM和福尔马林信号(右)。 请注意,面板(B和C)中的高边界信号是N4算法的已知效果,但是,在进一步图像处理之前,这些边界被移除。

import SimpleITK as sitk
import numpy as np
from nipype.interfaces.ants import N4BiasFieldCorrection

def correct_bias(in_file, out_file, image_type=sitk.sitkFloat64):
    """
    Corrects the bias using ANTs N4BiasFieldCorrection. If this fails, will then attempt to correct bias using SimpleITK
    :param in_file: nii文件的输入路径
    :param out_file: 校正后的文件保存路径名
    :return: 校正后的nii文件全路径名
    """
    #使用N4BiasFieldCorrection校正MRI图像的偏置场
    correct = N4BiasFieldCorrection()
    correct.inputs.input_image = in_file
    correct.inputs.output_image = out_file
    try:
        done = correct.run()
        return done.outputs.output_image
    except IOError:
        warnings.warn(RuntimeWarning("ANTs N4BIasFieldCorrection could not be found."
                                     "Will try using SimpleITK for bias field correction"
                                     " which will take much longer. To fix this problem, add N4BiasFieldCorrection"
                                     " to your PATH system variable. (example: EXPORT PATH=${PATH}:/path/to/ants/bin)"))
        input_image = sitk.ReadImage(in_file, image_type)
        output_image = sitk.N4BiasFieldCorrection(input_image, input_image > 0)
        sitk.WriteImage(output_image, out_file)
        return os.path.abspath(out_file)

def normalize_image(in_file, out_file, bias_correction=True):
    #bias_correction:是否需要校正
    if bias_correction:
        correct_bias(in_file, out_file)
    else:
        shutil.copy(in_file, out_file)
    return out_file

reference:

https://blog.****.net/m0_37477175/article/details/88103406

 2. 裁剪所有MRI图像放缩到同一尺度后保存到hdf5文件中

#training_files:训练的MRI文件列表,每一项是一个元组,元组中包括4种不同模态图像+ground truth的全路径
#config["image_shape"]:处理后的图片大小(144,144,144)
write_data_to_file(training_files, config["data_file"], image_shape=config["image_shape"])


def write_data_to_file(training_data_files, out_file, image_shape, truth_dtype=np.uint8, subject_ids=None,
                       normalize=True, crop=True):
    """
    Takes in a set of training images and writes those images to an hdf5 file.
    :param training_data_files: 训练的MRI文件列表,每一项是一个元组,元组中包括4种不同模态图像+ground truth的全路径
    Example: [('sub1-T1.nii.gz', 'sub1-T2.nii.gz', 'sub1-flair.nii.gz','sub1-t1ce.nii.gz','sub1-truth.nii.gz'),
              ('sub2-T1.nii.gz', 'sub2-T2.nii.gz', 'sub2-flair.nii.gz','sub2-t1ce.nii.gz' 'sub2-truth.nii.gz')]
    :param out_file: 保存的hdf5文件路径
    :param image_shape: 保存的image shape.
    :param truth_dtype: ground truth数据类型,Default is 8-bit unsigned integer.
    :return: Location of the hdf5 file with the image data written to it. 
    """
    #样本数
    n_samples = len(training_data_files)
    #通道数等于模态数4
    n_channels = len(training_data_files[0]) - 1

    try:
        #创建hdf5文件,在hdf5_file.root下创建3个压缩的可扩展数组
        hdf5_file, data_storage, truth_storage, affine_storage = create_data_file(out_file,
                                                                                  n_channels=n_channels,
                                                                                  n_samples=n_samples,
                                                                                  image_shape=image_shape)
    except Exception as e:
        # If something goes wrong, delete the incomplete data file
        os.remove(out_file)
        raise e

    #crop=True:裁剪
    write_image_data_to_file(training_data_files, data_storage, truth_storage, image_shape,
                             truth_dtype=truth_dtype, n_channels=n_channels, affine_storage=affine_storage, crop=crop)
    if subject_ids:
        hdf5_file.create_array(hdf5_file.root, 'subject_ids', obj=subject_ids)
    #训练样本数据归一化
    if normalize:
        normalize_data_storage(data_storage)
    hdf5_file.close()
    #返回hdf5文件名
    return out_file

 创建hdf5文件要借助tables库

import tables

 tables使用可参考:https://blog.****.net/lengyuexiang123/article/details/53558779

def create_data_file(out_file, n_channels, n_samples, image_shape):
    #创建hdf5文件
    hdf5_file = tables.open_file(out_file, mode='w')
    #压缩可扩展数组
    #定义一个filter来说明压缩方式及压缩深度
    filters = tables.Filters(complevel=5, complib='blosc')

    #定义data和truth的shape
    ##能够扩展其一个维度,我们把可以扩展这个维度的shape设置为0
    data_shape = tuple([0, n_channels] + list(image_shape))#(0,4,144,144,144)
    truth_shape = tuple([0, 1] + list(image_shape))#(0,1,144,144,144)

    #创建3个压缩可扩展数组保存data,truth和affine
    data_storage = hdf5_file.create_earray(hdf5_file.root, 'data', tables.Float32Atom(), shape=data_shape,
                                           filters=filters, expectedrows=n_samples)
    truth_storage = hdf5_file.create_earray(hdf5_file.root, 'truth', tables.UInt8Atom(), shape=truth_shape,
                                            filters=filters, expectedrows=n_samples)
    #放射变换数组
    affine_storage = hdf5_file.create_earray(hdf5_file.root, 'affine', tables.Float32Atom(), shape=(0, 4, 4),
                                             filters=filters, expectedrows=n_samples)
    return hdf5_file, data_storage, truth_storage, affine_storage


def write_image_data_to_file(image_files, data_storage, truth_storage, image_shape, n_channels, affine_storage,
                             truth_dtype=np.uint8, crop=True):
    for set_of_files in image_files:
        #set_of_files:不同模态图像路径的元组('sub1-T1.nii.gz', 'sub1-T2.nii.gz', 'sub1-flair.nii.gz','sub1-t1ce.nii.gz','sub1-truth.nii.gz')
        #crop=True
        #对4个模态+truth图像根据前景背景裁剪
        images = reslice_image_set(set_of_files, image_shape, label_indices=len(set_of_files) - 1, crop=crop)

        #获取4个模态+truth的image的数组
        subject_data = [image.get_data() for image in images]

        #images[0].affine:4*4
        add_data_to_storage(data_storage, truth_storage, affine_storage, subject_data, images[0].affine, n_channels,
                            truth_dtype)
    #返回data和ground_truth
    return data_storage, truth_storage


def add_data_to_storage(data_storage, truth_storage, affine_storage, subject_data, affine, n_channels, truth_dtype):
    #添加1份subject_data数据,写入时将subject_data扩展到与create_data_file中定义的维度相同
    data_storage.append(np.asarray(subject_data[:n_channels])[np.newaxis])#np.asarray:==>[4,144,144,144] 扩展=new.axis:[1,4,144,144,144]
    truth_storage.append(np.asarray(subject_data[n_channels], dtype=truth_dtype)[np.newaxis][np.newaxis])#np.asarray:==>[144,144,144] 扩展=new.axis:[1,1,144,144,144]
    affine_storage.append(np.asarray(affine)[np.newaxis])#np.asarray:==>[4,4] 扩展=new.axis:[1,4,,4]

 对nii image根据综合4个模态+truth和设置的background像素值获取foreground,并裁剪后再放缩至指定大小的过程如下

获取foreground并裁剪的效果可看另一篇博客:

def reslice_image_set(in_files, image_shape, out_files=None, label_indices=None, crop=False):
    #in_files:('sub1-T1.nii.gz', 'sub1-T2.nii.gz', 'sub1-flair.nii.gz','sub1-t1ce.nii.gz','sub1-truth.nii.gz')
    #label_indices:模态个数-4
    #对图像进行裁剪
    if crop:
        #返回各个维度要裁剪的范围[slice(),slice(),slice()]
        crop_slices = get_cropping_parameters([in_files])
    else:
        crop_slices = None
    #对in_files中的每个image裁剪放缩后返回的image列表
    images = read_image_files(in_files, image_shape=image_shape, crop=crop_slices, label_indices=label_indices)
    if out_files:
        for image, out_file in zip(images, out_files):
            image.to_filename(out_file)
        return [os.path.abspath(out_file) for out_file in out_files]
    else:
        return images
def get_cropping_parameters(in_files):
    #in_files:[('sub1-T1.nii.gz', 'sub1-T2.nii.gz', 'sub1-flair.nii.gz','sub1-t1ce.nii.gz','sub1-truth.nii.gz')]
    if len(in_files) > 1:
        foreground = get_complete_foreground(in_files)
    else:
        #return_image=True:返回foreground image
        #in_files[0]:('sub1-T1.nii.gz', 'sub1-T2.nii.gz', 'sub1-flair.nii.gz','sub1-t1ce.nii.gz','sub1-truth.nii.gz')
        foreground = get_foreground_from_set_of_files(in_files[0], return_image=True)
    #根据foreground确定各个维度要裁剪的范围,crop_img_to进行裁剪
    #return_slices=True:返回各个维度要裁剪的范围[slice(),slice(),slice()]
    return crop_img(foreground, return_slices=True, copy=True)
def get_foreground_from_set_of_files(set_of_files, background_value=0, tolerance=0.00001, return_image=False):
    #set_of_files:('sub1-T1.nii.gz', 'sub1-T2.nii.gz', 'sub1-flair.nii.gz','sub1-t1ce.nii.gz','sub1-truth.nii.gz')
    #image_file:sub1-T1.nii.gz
    for i, image_file in enumerate(set_of_files):
        #读取image无裁剪,无resize
        image = read_image(image_file)
        #根据设置的background值,综合所有模态和truth将image data数组中foreground位置设为True
        is_foreground = np.logical_or(image.get_data() < (background_value - tolerance),
                                      image.get_data() > (background_value + tolerance))
        if i == 0:
            foreground = np.zeros(is_foreground.shape, dtype=np.uint8)

        #将is_foreground位置像素值设置为1
        foreground[is_foreground] = 1
    #返回image
    if return_image:
        return new_img_like(image, foreground)
    #返回数组
    else:
        return foreground
def read_image(in_file, image_shape=None, interpolation='linear', crop=None):
    print("Reading: {0}".format(in_file))
    image = nib.load(os.path.abspath(in_file))
    image = fix_shape(image)
    #裁剪
    if crop:
        #crop example:[(45, 192, None), (47, 210, None), (0, 137, None)]分别代表X,Y,Z的裁剪坐标范围
        image = crop_img_to(image, crop, copy=True)
    #放缩
    if image_shape:
        return resize(image, new_shape=image_shape, interpolation=interpolation)
    else:
        return image


def fix_shape(image):
    if image.shape[-1] == 1:
        #np.squeeze减少维度
        #image.__class__():根据data数组新建nii image
        return image.__class__(dataobj=np.squeeze(image.get_data()), affine=image.affine)
    return image
def crop_img(img, rtol=1e-8, copy=True, return_slices=False):
    img = check_niimg(img)
    data = img.get_data()

    infinity_norm = max(-data.min(), data.max())
    passes_threshold = np.logical_or(data < -rtol * infinity_norm,
                                     data > rtol * infinity_norm)

    if data.ndim == 4:
        passes_threshold = np.any(passes_threshold, axis=-1)
    coords = np.array(np.where(passes_threshold))
    start = coords.min(axis=1)
    end = coords.max(axis=1) + 1

    # pad with one voxel to avoid resampling problems
    start = np.maximum(start - 1, 0)
    end = np.minimum(end + 1, data.shape[:3])

    slices = [slice(s, e) for s, e in zip(start, end)]

    if return_slices:
        return slices

    return crop_img_to(img, slices, copy=copy)
def read_image_files(image_files, image_shape=None, crop=None, label_indices=None):
    """
    
    :param image_files: in_files:('sub1-T1.nii.gz', 'sub1-T2.nii.gz', 'sub1-flair.nii.gz','sub1-t1ce.nii.gz','sub1-truth.nii.gz')
    :param image_shape: (144,144,144)
    :param crop: [slice(start,stop),slice(start,stop),slice(start,stop)]
    :param label_indices:4
    :param use_nearest_for_last_file: If True, will use nearest neighbor interpolation for the last file. This is used
    because the last file may be the labels file. Using linear interpolation here would mess up the labels.
    :return: 
    """
    if label_indices is None:
        label_indices = []
    elif not isinstance(label_indices, collections.Iterable) or isinstance(label_indices, str):
        label_indices = [label_indices]#[4]
    image_list = list()
    for index, image_file in enumerate(image_files):
        #对ground truth使用最近领域差值,其他模态使用线性插值放缩
        if (label_indices is None and (index + 1) == len(image_files)) \
                or (label_indices is not None and index in label_indices):
            interpolation = "nearest"
        else:
            interpolation = "linear"
        #裁剪,放缩
        image_list.append(read_image(image_file, image_shape=image_shape, crop=crop, interpolation=interpolation))

    #返回从image_files中读取的裁剪放缩后的image列表
    return image_list
def resize(image, new_shape, interpolation="linear"):
    image = reorder_img(image, resample=interpolation)
    zoom_level = np.divide(new_shape, image.shape)
    new_spacing = np.divide(image.header.get_zooms(), zoom_level)
    new_data = resample_to_spacing(image.get_data(), image.header.get_zooms(), new_spacing,
                                   interpolation=interpolation)
    new_affine = np.copy(image.affine)
    np.fill_diagonal(new_affine, new_spacing.tolist() + [1])
    new_affine[:3, 3] += calculate_origin_offset(new_spacing, image.header.get_zooms())
    return new_img_like(image, new_data, affine=new_affine)

对训练集所有模态image归一化

def normalize_data(data, mean, std):
    #data:[4,144,144,144]
    data -= mean[:, np.newaxis, np.newaxis, np.newaxis]
    data /= std[:, np.newaxis, np.newaxis, np.newaxis]
    return data


def normalize_data_storage(data_storage):
    means = list()
    stds = list()
    #[n_example,4,144,144,144]
    for index in range(data_storage.shape[0]):
        #[4,144,144,144]
        data = data_storage[index]
        #分别求出每个模态的均值和标准差
        means.append(data.mean(axis=(1, 2, 3)))
        stds.append(data.std(axis=(1, 2, 3)))
    #求每个模态在所有样本上的均值和标准差[n_example,4]==>[4]
    mean = np.asarray(means).mean(axis=0)
    std = np.asarray(stds).mean(axis=0)
    for index in range(data_storage.shape[0]):
        #根据均值和标准差对每一个样本归一化
        data_storage[index] = normalize_data(data_storage[index], mean, std)
    return data_storage