# Stack manipulations
# Author: David Young, 2017, 2019
"""Import and export image stacks in various formats.
"""
from collections import OrderedDict
import os
import glob
from typing import List, Optional, Sequence
import numpy as np
from skimage import transform
from skimage import io
from matplotlib import animation
from matplotlib.image import AxesImage
from scipy import ndimage
from magmap.cv import chunking, cv_nd
from magmap.plot import colormaps
from magmap.settings import config
from magmap.io import importer
from magmap.io import libmag
from magmap.io import np_io
from magmap.plot import plot_3d
from magmap.plot import plot_support
_logger = config.logger.getChild(__name__)
[docs]
class StackPlaneIO(chunking.SharedArrsContainer):
"""Worker class to export planes from a stack with support for
multiprocessing.
Attributes:
imgs: A list of images, with exact specification determined by
the calling function.
images: Sequence of images. For import, each "image" is a path to
and image file. For export, each "image" is a sequence of
planes, with the first sequence assumed to an atlas,
followed by labels-based images, each consisting of
corresponding planes.
fn_process: Function to process each image through multiprocessing,
where the function should take an index and image and return the
index and processed plane.
rescale (float): Rescale factor; defaults to 1.
cmaps_labels: Sequence of colormaps for labels-based images;
defaults to None. Length should be equal to that of
``images`` - 1.
start_planei (int): Index of start plane, used for labeling the
plane; defaults to 0. The plane is only annotated when
:attr:`config.plot_labels[config.PlotLabels.TEXT_POS]` is given
to specify the position of the text in ``x,y`` relative to the
axes.
"""
imgs = None
#: Interpolation order, used in :meth:`skimage.transform.resize`.
interp_order: Optional[int] = None
def __init__(self):
super().__init__()
self.images = None
self.fn_process = None
self.rescale = 1
self.start_planei = 0
self.origin = None
self.aspect = None
self.cmaps_labels = None
self.img_slice = None
[docs]
@classmethod
def set_data(cls, imgs):
"""Set data to be accessed by worker functions."""
cls.imgs = imgs
[docs]
@classmethod
def convert_imgs(cls):
"""Restore all shared arrays to a list of arrays."""
if cls.imgs is None:
cls.imgs = []
for key in cls.shared_arrs.keys():
cls.imgs.append(cls.convert_shared_arr(key))
[docs]
@classmethod
def import_img(cls, i, rescale, multichannel):
"""Import and rescale an image.
Assumes that :attr:``imgs`` is a list of paths to 2D images.
Args:
i: Index within :attr:``imgs`` to plot.
rescale: Rescaling multiplier.
multichannel: True if the images are multichannel.
Returns:
A tuple of ``i`` and a list of the processed images. The
processed image list has the same length as :attr:``imgs``,
or the number of image paths.
"""
# TODO: consider removing since imported image is not saved;
# should import first before exporting
cls.convert_imgs()
path = cls.imgs[i]
print("importing {}".format(path))
img = io.imread(path)
img = transform.rescale(
img, rescale, mode="reflect", multichannel=multichannel,
preserve_range=True, anti_aliasing=True)
return i, img
[docs]
@classmethod
def process_plane(cls, i, target_size, multichannel, rotate=None):
"""Process corresponding planes from related images.
Assumes that :attr:``imgs`` is a list of nested 2D image lists,
where the first nested list is assumed to be a sequence of
histology image planes, while subsequent images are
labels-based images.
Args:
i: Index within nested lists of :attr:``imgs`` to plot.
target_size: Resize to this shape.
multichannel: True for multichannel images.
rotate: Degrees by which to rotate; defaults to None.
Returns:
A tuple of ``i`` and a list of the processed images. The
processed image list has the same length as :attr:``imgs``,
or the number of nested lists.
"""
print("processing plane {}".format(i))
cls.convert_imgs()
imgs_proc = []
for j, img_stack in enumerate(cls.imgs):
order = cls.interp_order if j == 0 else 0
img = cv_nd.rescale_resize(
img_stack[i], target_size, multichannel, True, order=order)
imgs_proc.append(img)
if rotate:
# TODO: consider removing since not saved; should instead rotate
# image using transform task
# rotate, filling background with edge color
for j, img in enumerate(imgs_proc):
#img = img[10:-100, 10:-80] # manually crop out any border
cval = np.mean(img[0, 0])
img = ndimage.rotate(img, rotate, cval=cval)
# additional cropping for oblique bottom edge after rotation
#img = img[:-50]
imgs_proc[j] = img
return i, imgs_proc
[docs]
def build_stack(
self, axs: List, scale_bar: bool = True, fit: bool = False
) -> Optional[List]:
"""Builds a stack of Matploblit 2D images.
Uses multiprocessing to load or resize each image.
Args:
axs: Sub-plot axes.
scale_bar: True to include scale bar; defaults to True.
fit: True to fit the figure frame to the resulting image.
Returns:
:List[List[:obj:`matplotlib.image.AxesImage`]]: Nested list of
axes image objects. The first list level contains planes, and
the second level are channels within each plane.
"""
def handle_extracted_plane():
# get sub-plot and hide x/y axes
ax = axs
if libmag.is_seq(ax):
ax = axs[imgi]
plot_support.hide_axes(ax)
# multiple artists can be shown at each frame by collecting
# each group of artists in a list; overlay_images returns
# a nested list containing a list for each image, which in turn
# contains a list of artists for each channel
overlaid = plot_support.ImageOverlayer(
ax, self.aspect, self.origin, ignore_invis=True, rgb=config.rgb)
ax_imgs = overlaid.overlay_images(
imgs, None, cmaps_all, check_single=True)
if (colorbar is not None and len(ax_imgs) > 0
and len(ax_imgs[0]) > 0 and imgi == 0):
# add colorbar with scientific notation if outside limits
cbar = ax.figure.colorbar(ax_imgs[0][0], ax=ax, **colorbar)
plot_support.set_scinot(cbar.ax, lbls=None, units=None)
plotted_imgs[imgi] = np.array(ax_imgs).flatten()
if libmag.is_seq(text_pos) and len(text_pos) > 1:
# write plane index in axes rather than data coordinates
text = ax.text(
*text_pos[:2], "{}-plane: {}".format(
plot_support.get_plane_axis(config.plane),
self.start_planei + imgi),
transform=ax.transAxes, color="w")
plotted_imgs[imgi] = [*plotted_imgs[imgi], text]
if scale_bar:
plot_support.add_scale_bar(ax, 1 / self.rescale, config.plane)
# number of image types (eg atlas, labels) and corresponding planes
num_image_types = len(self.images)
if num_image_types < 1: return None
num_images = len(self.images[0])
if num_images < 1: return None
# import the images as Matplotlib artists via multiprocessing
plotted_imgs: List = [None] * num_images
img_shape = self.images[0][0].shape
target_size = np.multiply(img_shape, self.rescale).astype(int)
multichannel = self.images[0][0].ndim >= 3
if multichannel:
print("building stack for channel: {}".format(config.channel))
target_size = target_size[:-1]
# setup imshow parameters
colorbar = config.roi_profile["colorbar"]
cmaps_all = [config.cmaps, *self.cmaps_labels]
text_pos = config.plot_labels[config.PlotLabels.TEXT_POS]
StackPlaneIO.set_data(self.images)
pool_results = None
pool = None
multiprocess = self.rescale != 1
if multiprocess:
# set up multiprocessing
initializer = None
initargs = None
if not chunking.is_fork():
# set up labels image as a shared array for spawned mode
initializer, initargs = StackPlaneIO.build_pool_init(
OrderedDict([
(i, img) for i, img in enumerate(self.images)]))
pool = chunking.get_mp_pool(initializer, initargs)
pool_results = []
for i in range(num_images):
# add rotation argument if necessary
args = (i, target_size, multichannel)
if pool is None:
# extract and handle without multiprocessing
imgi, imgs = self.fn_process(*args)
handle_extracted_plane()
else:
# extract plane in multiprocessing
pool_results.append(
pool.apply_async(self.fn_process, args=args))
if multiprocess:
# handle multiprocessing output
for result in pool_results:
imgi, imgs = result.get()
handle_extracted_plane()
pool.close()
pool.join()
if fit and plotted_imgs:
# fit each figure to its first available image
for ax_img in plotted_imgs:
# images may be flattened AxesImage, array of AxesImage and
# Text, or None if alpha set to 0
if libmag.is_seq(ax_img):
ax_img = ax_img[0]
if isinstance(ax_img, AxesImage):
ax_img.axes.set_frame_on(False)
plot_support.fit_frame_to_image(
ax_img.figure, None, self.aspect)
return plotted_imgs
[docs]
def animate_imgs(base_path, plotted_imgs, delay, ext=None, suffix=None):
"""Export to an animated image.
Defaults to an animated GIF unless ``ext`` specifies otherwise.
Requires ``FFMpeg`` for MP4 file format exports and ``ImageMagick`` for
all other types of exports.
Args:
base_path (str): String from which an output path will be constructed.
plotted_imgs (List[:obj:`matplotlib.image.AxesImage]): Sequence of
images to include in the animation.
delay (int): Delay between image display in ms. If None, the delay will
defaul to 100ms.
ext (str): Extension to use when saving, without the period. Defaults
to None, in which case "gif" will be used.
suffix (str): String to append to output path before extension;
defaults to None to ignore.
"""
# set up animation output path and time interval
if ext is None: ext = "gif"
out_path = libmag.combine_paths(base_path, "animated", ext=ext)
if suffix: out_path = libmag.insert_before_ext(out_path, suffix, "_")
libmag.backup_file(out_path)
if delay is None:
delay = 100
if plotted_imgs and len(plotted_imgs[0]) > 0:
fig = plotted_imgs[0][0].figure
else:
libmag.warn("No images available to animate")
return
# WORKAROUND: FFMpeg may give a "height not divisible by 2" error, fixed
# by padding with a pixel
# TODO: check if needed for width
# TODO: account for difference in FFMpeg height and fig height
for fn, size in {
# fig.set_figwidth: fig.get_figwidth(),
fig.set_figheight: fig.get_figheight()}.items():
if size * fig.dpi % 2 != 0:
fn(size + 1. / fig.dpi)
print("Padded size with", fn, fig.get_figwidth(), "to new size of",
fig.get_figheight())
# generate and save animation
anim = animation.ArtistAnimation(
fig, plotted_imgs, interval=delay, repeat_delay=0, blit=False)
try:
writer = "ffmpeg" if ext == "mp4" else "imagemagick"
anim.save(out_path, writer=writer)
print("saved animation file to {}".format(out_path))
except ValueError as e:
print(e)
libmag.warn("No animation writer available for Matplotlib")
def _setup_labels_cmaps(imgs, cmaps_labels=None):
"""Set up labels colormaps for registered images.
Args:
imgs (List[:obj:`np.ndarray`]): Sequence of images where the first
is assumed to be non-labels, the second is labels, and the
third are label borders.
cmaps_labels (List[List[
:class:`magmap.plot.colormaps.DiscreteColormap`]]):
List of discrete colormaps corresponding to ``imgs[:1]``.
Returns:
list: List of colormaps for ``[labels_img, borders_img]``.
"""
if cmaps_labels is None:
cmaps_labels = []
num_imgs = len(imgs)
if num_imgs > 1:
# get colormap for 2nd image, the main labels image
cmaps_labels.append(config.cmap_labels)
if num_imgs > 2:
# subsequent image's colormap is based on first labels if possible
cmaps_labels.append(
colormaps.get_borders_colormap(
imgs[2], imgs[1], cmaps_labels[0]))
return cmaps_labels
[docs]
def setup_stack(
image5d: np.ndarray, path: Optional[str] = None,
offset: Optional[Sequence[int]] = None,
roi_size: Optional[Sequence[int]] = None,
slice_vals: Optional[Sequence[int]] = None,
rescale: Optional[float] = None,
labels_imgs: Optional[Sequence[np.ndarray]] = None
) -> StackPlaneIO:
"""Set up a stack of images for export to file.
Supports a stack of image files in a directory or a single volumetric image
and associated labels images.
Args:
image5d: Images as a 4/5D Numpy array (t,z,y,x[c]). Can be None if
``path`` is set.
path: Path to an image directory from which all files will be imported
in Python sorted order, taking precedence over ``imaged5d``;
defaults to None.
offset: Tuple of offset given in user order (x, y, z); defaults to
None. Requires ``roi_size`` to not be None.
roi_size: Size of the region of interest in user order (x, y, z);
defaults to None. Requires ``offset`` to not be None.
slice_vals: List from which to construct a slice object to
extract only a portion of the image. Defaults to None, which
will give the whole image. If ``offset`` and ``roi_size`` are
also given, ``slice_vals`` will only be used for its interval term.
rescale: Rescaling factor for each image, performed on a plane-by-plane
basis; defaults to None, in which case 1.0 will be used.
labels_imgs: Sequence of labels-based images as a Numpy z,y,x arrays,
typically including labels and borders images; defaults to None.
Returns:
Stack builder instance.
"""
print("Starting image stack setup")
# build "z" slice, which will be applied to the transposed image
interval = 1 # default to export each plane
if offset is not None and roi_size is not None:
# extract planes based on ROI settings
# transpose coordinates to given plane
_, arrs_1d = plot_support.transpose_images(
config.plane, arrs_1d=[offset[::-1], roi_size[::-1]])
offset = arrs_1d[0][::-1]
roi_size = arrs_1d[1][::-1]
# ROI offset and size take precedence over slice vals except
# for use of the interval term
if slice_vals is not None and len(slice_vals) > 2:
interval = slice_vals[2]
size = roi_size[2]
img_sl = slice(offset[2], offset[2] + size, interval)
if interval is not None and interval < 0:
# reverse start/stop order to iterate backward
img_sl = slice(img_sl.stop, img_sl.start, interval)
print("using ROI offset {}, size {}, {}"
.format(offset, size, img_sl))
elif slice_vals:
# build directly from slice vals, replacing start and step if None
sl = slice(*slice_vals)
sl = [sl.start, sl.stop, sl.step]
if sl[0] is None:
# default to start at beginning of stack
sl[0] = 0
if sl[2] is None:
# default to interval/step of 1
sl[2] = 1
img_sl = slice(*sl)
else:
# default to take the whole image stack
img_sl = slice(0, None, 1)
if rescale is None:
rescale = 1.0
aspect = None
origin = None
cmaps_labels = []
extracted_planes = []
start_planei = 0
if path and os.path.isdir(path):
# builds animations from all files in a directory
planes = sorted(glob.glob(os.path.join(path, "*")))[::interval]
_logger.info("Importing images from %s: %s", path, planes)
fnc = StackPlaneIO.import_img
extracted_planes.append(planes)
else:
# load images from path and extract ROI based on slice parameters,
# assuming 1st image is atlas, 2nd and beyond are labels-based
imgs = [image5d]
if labels_imgs is not None:
for img in labels_imgs:
if img is not None: imgs.append(img[None])
_setup_labels_cmaps(imgs, cmaps_labels)
main_shape = None # z,y,x shape of 1st image
for img in imgs:
sl = img_sl
img_shape = img.shape[1:4]
if main_shape:
if main_shape != img_shape:
# scale slice bounds to the first image's shape
scaling = np.divide(img_shape, main_shape)
axis = plot_support.get_plane_axis(config.plane, True)
sl = libmag.scale_slice(
sl, scaling[axis], img_shape[axis])
else:
main_shape = img_shape
planes, aspect, origin = plot_support.extract_planes(
img, sl, plane=config.plane)
if offset is not None and roi_size is not None:
# get ROI using transposed coordinates on transposed planes;
# returns list
planes = planes[
:, offset[1]:offset[1]+roi_size[1],
offset[0]:offset[0]+roi_size[0]]
extracted_planes.append(planes)
fnc = StackPlaneIO.process_plane
if img_sl.start:
start_planei = img_sl.start
# store in stack worker
stacker = StackPlaneIO()
StackPlaneIO.interp_order = config.transform[
config.Transforms.INTERPOLATION]
stacker.images = extracted_planes
stacker.fn_process = fnc
stacker.rescale = rescale
stacker.start_planei = start_planei
stacker.origin = origin
stacker.aspect = aspect
stacker.cmaps_labels = cmaps_labels
stacker.img_slice = img_sl
return stacker
[docs]
def stack_to_img(paths, roi_offset, roi_size, series=None, subimg_offset=None,
subimg_size=None, animated=False, suffix=None):
"""Build an image file from a stack of images in a directory or an
array, exporting as an animated GIF or movie for multiple planes or
extracting a single plane to a standard image file format.
Writes the file to the parent directory of path.
Args:
paths (List[str]): Image paths, which can each be either an image
directory or a base path to a single image, including
volumetric images. If more than one path is given, a collage of
images will be generated with the layout set by
:attr:`magmap.settings.config.plot_labels[config.PlotLabels.LAYOUT]`.
roi_offset (Sequence[int]): Tuple of offset given in user order
``x,y,z``; defaults to None. Requires ``roi_size`` to not be None.
roi_size (Sequence[int]): Size of the region of interest in user order
``x,y,z``; defaults to None. Requires ``roi_offset`` to not be None.
series (int): Image series number; defaults to None.
subimg_offset (List[int]): Sub-image offset as (z,y,x) to load;
defaults to None.
subimg_size (List[int]): Sub-image size as (z,y,x) to load;
defaults to None.
animated (bool): True to export as an animated image; defaults to False.
suffix (str): String to append to output path before extension;
defaults to None to ignore.
"""
# set up figure layout for collages
size = config.plot_labels[config.PlotLabels.LAYOUT]
ncols, nrows = size if size else (1, 1)
num_paths = len(paths)
collage = num_paths > 1
figs = {}
path_base = paths[0]
for i in range(nrows):
for j in range(ncols):
# load image and display selected planes:
# - for collages, typically one plane of the image is shown as a
# subplot of a single fig
# - for non-collages: one or more planes are shown, each in a
# separate fig and axes except animations, which share one fig
n = i * ncols + j
if n >= num_paths: break
# load an image and set up its image stacker
path_sub = paths[n]
axs = []
# TODO: test directory of images
# TODO: consider not reloading first image
np_io.setup_images(path_sub, series, subimg_offset, subimg_size)
if config.img5d.img_io is config.LoadIO.TIFFFILE:
# remove extension from TIF files, which needed ext to load
path_base = libmag.get_filename_without_ext(path_base)
stacker = setup_stack(
config.image5d, path_sub, offset=roi_offset,
roi_size=roi_size, slice_vals=config.slice_vals,
rescale=config.transform[config.Transforms.RESCALE],
labels_imgs=(config.labels_img, config.borders_img))
# add sub-plot title unless groups given as empty string
title = None
if config.groups:
title = libmag.get_if_within(config.groups, n)
elif num_paths > 1:
title = os.path.basename(path_sub)
if not stacker.images: continue
ax = None
for k in range(len(stacker.images[0])):
# create or retrieve fig for the given images; animation and
# collage have only 1 fig, whereas stack of multiple planes
# has a separate fig for each plane
planei = 0 if animated else (
stacker.img_slice.start + k * stacker.img_slice.step)
fig_dict = figs.get(planei)
if not fig_dict:
# set up new fig for each new plane unless animated
fig, gs = plot_support.setup_fig(
nrows, ncols, config.plot_labels[config.PlotLabels.SIZE])
fig_dict = {"fig": fig, "gs": gs, "imgs": []}
figs[planei] = fig_dict
if ax is None or not animated:
# generate new axes for the gridspec position; share
# axes for animation
ax = fig_dict["fig"].add_subplot(fig_dict["gs"][i, j])
if title:
ax.title.set_text(title)
axs.append(ax)
# export planes
plotted_imgs = stacker.build_stack(
axs, config.plot_labels[config.PlotLabels.SCALE_BAR],
size is None or ncols * nrows == 1)
if animated:
# store all plotted images in single fig
fig_dict = figs.get(0)
if fig_dict:
fig_dict["imgs"] = plotted_imgs
else:
# store one plotted image per fig; not used currently
for fig_dict, img in zip(figs.values(), plotted_imgs):
fig_dict["imgs"].append(img)
for planei, fig_dict in figs.items():
if animated:
# generate animated image (eg animated GIF or movie file)
animate_imgs(
path_base, fig_dict["imgs"], config.delay, config.savefig,
suffix)
else:
# generate single figure with axis and plane index in filename
if collage:
# output filename as a collage of images
if not os.path.isdir(path_base):
path_base = os.path.dirname(path_base)
path_base = os.path.join(path_base, "collage")
# insert mod as suffix, then add any additional suffix;
# can use config.prefix_out for make_out_path prefix
mod = "_plane_{}{}".format(
plot_support.get_plane_axis(config.plane), planei)
out_path = libmag.make_out_path(path_base, suffix=mod)
if suffix:
out_path = libmag.insert_before_ext(out_path, suffix)
plot_support.save_fig(
out_path, config.savefig, fig=fig_dict["fig"])
[docs]
def reg_planes_to_img(imgs, path=None, ax=None):
"""Export registered image single planes to a single figure.
Simplified export tool taking a single plane from each registered image
type, overlaying in a single figure, and exporting to file.
Args:
imgs (List[:obj:`np.ndarray`]): Sequence of image planes to display.
The first image is assumed to be greyscale, the second is labels,
and any subsequent images are borders.
path (str): Output base path, which will be combined with
:attr:`config.savefig`; defaults to None to not save.
ax (:obj:`matplotlib.image.Axes`): Axes on which to plot; defaults
to False, in which case a new figure and axes will be generated.
"""
if ax is None:
# set up new figure with single subplot
fig, gs = plot_support.setup_fig(
1, 1, config.plot_labels[config.PlotLabels.SIZE])
ax = fig.add_subplot(gs[0, 0])
stacker = StackPlaneIO()
StackPlaneIO.interp_order = config.transform[
config.Transforms.INTERPOLATION]
stacker.images = [img[None] for img in imgs]
stacker.fn_process = StackPlaneIO.process_plane
stacker.cmaps_labels = _setup_labels_cmaps(imgs)
plotted_imgs = stacker.build_stack(ax, scale_bar=False)
ax_img = plotted_imgs[0][0]
aspect, origin = plot_support.get_aspect_ratio(config.plane)
ax_img.axes.set_frame_on(False)
plot_support.fit_frame_to_image(
ax_img.figure, ax_img.get_array().shape, aspect)
if path:
plot_support.save_fig(path, config.savefig)
[docs]
def export_planes(
image5d: np.ndarray, ext: str, channel: Optional[int] = None,
separate_chls: bool = False):
"""Export all planes of a 3D+ image into separate 2D image files.
Unlike :meth:`stack_to_img`, this method exports raw planes and
each channels into separate files, without processing through Matplotlib.
Supports image rotation set in :attr:`magmap.settings.config.transform`.
By default, all z-planes are exported, with plane indices specified through
:attr:`config.slice_vals`. Alternatively, regions of interest can be
specified by :attr:`config.roi_offset` and :attr:`config.roi_size`.
The planar orientation can be configured through :attr:`config.plane`.
Args:
image5d: Image in ``t,z,y,x[,c]`` format.
ext: Save format given as an extension without period.
channel: Channel to save; defaults to None for all channels.
separate_chls (bool): True to export all channels from each plane to
a separate image; defaults to False.
"""
# set up output path
suffix = "_export" if config.suffix is None else config.suffix
out_path = libmag.make_out_path(suffix=suffix)
output_dir = os.path.dirname(out_path)
basename = os.path.splitext(os.path.basename(out_path))[0]
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
# set up image and apply any rotation
roi = image5d[0]
multichannel, channels = plot_3d.setup_channels(roi, channel, 3)
rotate = config.transform[config.Transforms.ROTATE]
roi = cv_nd.rotate90(roi, rotate, multichannel=multichannel)
stacker = setup_stack(
roi[np.newaxis, :], offset=config.roi_offset, roi_size=config.roi_size,
slice_vals=config.slice_vals,
rescale=config.transform[config.Transforms.RESCALE])
roi = stacker.images[0]
num_planes = len(roi)
img_sl = stacker.img_slice
for i, plane in enumerate(roi):
# add plane to output path
out_name = f"{basename}_plane_" \
f"{plot_support.get_plane_axis(config.plane)}" \
f"{img_sl.start + img_sl.step * i}"
path = os.path.join(output_dir, out_name)
if separate_chls and multichannel:
for chl in channels:
# save each channel as separate file
plane_chl = plane[..., chl]
path_chl = "{}{}{}.{}".format(
path, importer.CHANNEL_SEPARATOR, chl, ext)
print("Saving image plane {} to {}".format(i, path_chl))
io.imsave(path_chl, plane_chl)
else:
# save single channel plane
path = "{}.{}".format(path, ext)
print("Saving image plane {} to {}".format(i, path))
io.imsave(path, plane)
if __name__ == "__main__":
print("MagellanMapper stack manipulations")