# Regional volume and density management
# Author: David Young, 2018, 2019
"""Measure volumes and densities by regions.
Intended to be higher-level, relatively atlas-agnostic measurements.
"""
from enum import Enum
from time import time
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
from scipy.spatial.distance import cdist
from skimage import measure
from magmap.cv import chunking
from magmap.stats import atlas_stats, clustering
from magmap.settings import config
from magmap.io import libmag
from magmap.atlas import ontology
from magmap.cv import cv_nd
from magmap.io import df_io
_logger = config.logger.getChild(__name__)
# metric keys and column names
LabelMetrics = Enum(
"LabelMetrics", [
"Region",
"Volume", # volume, converted to physical units
"VolAlt", # alternate volume, eg smoothed volume
"VolPx", # volume in pixels
"VolAltPx", # alternate volume in pixels
"Intensity",
"Nuclei",
# densities; "Density" = nuclei density
# TODO: change density to nuclei density
# TODO: consider changing enum for KEY: name format
"Density", "DensityIntens",
"RegVolMean", "RegNucMean", "RegDensityMean", # per region
"VarNuclei", "VarNucIn", "VarNucOut",
"VarIntensity", "VarIntensIn", "VarIntensOut",
"MeanIntensity",
"MedIntensity",
"LowIntensity",
"HighIntensity",
"EntropyIntensity",
"VarIntensMatch",
"VarIntensDiff",
"MeanNuclei",
"VarNucMatch",
# Distances
"EdgeSize", # edge pixels
"EdgeDistSum", # sum of distances between edges in two images
"EdgeDistMean", # mean of these distances
"Dist", # generic distance
# Variation
"CoefVarIntens", "CoefVarNuc",
# Shape measurements
"SurfaceArea", "Compactness",
# Overlap metrics
"VolDSC", "NucDSC", # volume/nuclei Dice Similarity Coefficient
"VolOut", "NucOut", # volume/nuclei shifted out of orig position
# Point cloud measurements
"NucCluster", # number of nuclei clusters
"NucClusNoise", # number of nuclei that do not fit into a cluster
"NucClusLarg", # number of nuclei in the largest cluster
]
)
# variation metrics
VAR_METRICS = (
LabelMetrics.RegVolMean, LabelMetrics.RegNucMean,
LabelMetrics.VarNuclei, LabelMetrics.VarNucIn, LabelMetrics.VarNucOut,
LabelMetrics.VarIntensity, LabelMetrics.VarIntensIn,
LabelMetrics.VarIntensOut,
LabelMetrics.MeanIntensity,
LabelMetrics.MedIntensity,
LabelMetrics.LowIntensity,
LabelMetrics.HighIntensity,
LabelMetrics.EntropyIntensity,
LabelMetrics.VarIntensMatch,
LabelMetrics.VarIntensDiff,
LabelMetrics.MeanNuclei,
LabelMetrics.VarNucMatch,
LabelMetrics.CoefVarIntens, LabelMetrics.CoefVarNuc,
)
# nuclei metrics
NUC_METRICS = (
LabelMetrics.Nuclei,
LabelMetrics.RegNucMean,
LabelMetrics.MeanNuclei,
LabelMetrics.VarNuclei,
LabelMetrics.VarNucIn,
LabelMetrics.VarNucOut,
LabelMetrics.VarNucMatch,
LabelMetrics.CoefVarNuc,
)
# metrics computed from weighted averages
WT_METRICS = (
*VAR_METRICS,
LabelMetrics.EdgeDistMean,
)
def _coef_var(df):
# calculate coefficient of variation from data frame columns,
# where first column is std and second is mean
return np.divide(df.iloc[:, 0], df.iloc[:, 1])
[docs]
class MetricCombos(Enum):
"""Combinations of metrics.
Each combination should be a tuple of combination name, a
tuple of metric Enums, and a function to use for aggregation applied
across colums to give a new metric value for each row.
"""
# sum of columns measuring regional homogeneity; missing columns
# will be ignored
HOMOGENEITY = (
"Homogeneity",
(LabelMetrics.VarIntensity, #LabelMetrics.VarIntensDiff,
LabelMetrics.EdgeDistSum, LabelMetrics.VarNuclei),
lambda x: np.nanmean(x, axis=1))
# coefficient of variation of intensity values
COEFVAR_INTENS = (
"CoefVarIntensity",
(LabelMetrics.VarIntensity, LabelMetrics.MeanIntensity),
_coef_var)
# coefficient of variation of intensity values
COEFVAR_NUC = (
"CoefVarNuclei",
(LabelMetrics.VarNuclei, LabelMetrics.MeanNuclei),
_coef_var)
[docs]
class LabelToEdge(chunking.SharedArrsContainer):
"""Convert a labels image to its edge
Provides class methods for encapsulation and shared arrays to use in
multiprocessing without requirement for global variables.
"""
#: Labels image.
labels_img: np.ndarray = None
#: Structuring element footprint for generating the edge, where larger
#: footprints given thicker edges.
footprint: Sequence[int] = None
[docs]
@classmethod
def find_label_edge(cls, label_id: int) -> Tuple[int, Sequence, np.ndarray]:
"""Convert a label into just its border.
Args:
label_id: Integer of the label to extract from
:attr:`labels_img_np`.
Returns:
Tuple of:
- ``label ID``: the label ID
- ``slices``: list of slices defining the location of the ROI
where the edges can be found
- ``borders``: ROI as a volume mask defining where the edges exist
"""
_logger.info("Getting edge for %s", label_id)
borders = None
# extract region from full labels image
if cls.labels_img is None:
cls.labels_img = cls.convert_shared_arr(config.RegNames.IMG_LABELS)
region, slices = cv_nd.extract_region(cls.labels_img, label_id)
if region is not None:
# get border of label
label_mask_region = region == label_id
borders = cv_nd.perimeter_nd(
label_mask_region, footprint=cls.footprint)
else:
_logger.warn(
"Could not find region '%s' to generate edge", label_id)
return label_id, slices, borders
[docs]
@classmethod
def make_labels_edge(cls, labels_img_np: np.ndarray) -> np.ndarray:
"""Convert labels image into label borders image.
The atlas is assumed to be a sample (eg microscopy) image on which
an edge-detection filter will be applied.
Args:
labels_img_np: Image as a Numpy array, assumed to be an
annotated image whose edges will be found by obtaining
the borders of all annotations.
Returns:
Binary image array the same shape as ``labels_img_np`` with labels
reduced to their corresponding borders.
"""
start_time = time()
labels_edge = np.zeros_like(labels_img_np)
label_ids = np.unique(labels_img_np)
is_fork = chunking.is_fork()
initializer = None
initargs = None
if is_fork:
# use a class to set and process the label without having to
# reference the labels image as a global variable
cls.labels_img = labels_img_np
else:
# set up labels image as a shared array for spawned mode
initializer, initargs = cls.build_pool_init({
config.RegNames.IMG_LABELS: labels_img_np})
pool = chunking.get_mp_pool(initializer, initargs)
pool_results = []
for label_id in label_ids:
pool_results.append(
pool.apply_async(
cls.find_label_edge, args=(label_id, )))
for result in pool_results:
label_id, slices, borders = result.get()
if slices is not None:
borders_region = labels_edge[tuple(slices)]
borders_region[borders] = label_id
pool.close()
pool.join()
print("time elapsed to make labels edge:", time() - start_time)
return labels_edge
[docs]
class MeasureLabel(chunking.SharedArrsContainer):
"""Measure metrics within image labels in a way that allows
multiprocessing without global variables.
All images should be of the same shape. If :attr:``df`` is available,
it will be used in place of underlying images. Typically this
data frame contains metrics for labels only at the lowest level,
such as drawn or non-overlapping labels. These labels can then be
used to aggregate values through summation or weighted means to
generate metrics for superseding labels that contains these
individual labels.
Attributes:
atlas_img_np: Sample image as a Numpy array.
labels_img_np: Integer labels image as a Numpy array.
labels_edge: Numpy array of labels reduced to their edges.
dist_to_orig: Distance map of labels to edges, with intensity values
in the same placement as in ``labels_edge``.
heat_map: Numpy array as a density map.
blobs (:obj:`np.ndarray`): 2D array of blobs such as nuclei in the
format, ``[[z, y, x, label_id, ...], ...]``. Defaults to None.
subseg: Integer sub-segmentations labels image as Numpy array.
df: Pandas data frame with a row for each sub-region.
"""
# metric keys
_COUNT_METRICS = (
LabelMetrics.Volume, LabelMetrics.Intensity, LabelMetrics.Nuclei)
_EDGE_METRICS = (
LabelMetrics.EdgeSize, LabelMetrics.EdgeDistSum,
LabelMetrics.EdgeDistMean)
_SHAPE_METRICS = (
LabelMetrics.SurfaceArea, LabelMetrics.Compactness)
_PCL_METRICS = (
LabelMetrics.NucCluster, LabelMetrics.NucClusNoise,
LabelMetrics.NucClusLarg,
)
# images and data frame
atlas_img_np = None
labels_img_np = None
labels_edge = None
dist_to_orig = None
labels_interior = None
heat_map = None
blobs = None
subseg = None
df = None
spacing = None
[docs]
@classmethod
def set_data(cls, atlas_img_np, labels_img_np, labels_edge=None,
dist_to_orig=None, labels_interior=None, heat_map=None,
blobs=None, subseg=None, df=None, spacing=None):
"""Set the images and data frame."""
cls.atlas_img_np = atlas_img_np
cls.labels_img_np = labels_img_np
cls.labels_edge = labels_edge
cls.dist_to_orig = dist_to_orig
cls.labels_interior = labels_interior
cls.heat_map = heat_map
cls.blobs = blobs
cls.subseg = subseg
cls.df = df
cls.spacing = spacing
[docs]
@classmethod
def label_metrics(
cls, label_id: Union[int, Sequence[int]],
extra_metrics: List[config.MetricGroups] = None,
df: Optional[pd.DataFrame] = None, spacing: Sequence[float] = None
) -> Tuple[int, Dict]:
"""Calculate metrics for a given label or set of labels.
Wrapper to call :func:``measure_variation``,
:func:``measure_variation``, and :func:``measure_edge_dist``.
Args:
label_id: Integer of the label or sequence of multiple labels
in :attr:``labels_img_np`` for which to measure variation.
extra_metrics: Sequence of additional metric groups to measure;
defaults to None.
df: Pandas data frame with a row for each sub-region; defaults
to None.
spacing: Sequence of image spacing for each pixel in the images;
defaults to None.
Returns:
Tuple of the given label ID, intensity variation, number of
pixels in the label, density variation, number of blobs,
sum edge distances, mean of edge distances, and number of
pixels in the label edge.
"""
# convert shared arrays to ndarrays if not present
if cls.atlas_img_np is None:
cls.atlas_img_np = cls.convert_shared_arr(
config.RegNames.IMG_ATLAS)
if cls.labels_img_np is None:
cls.labels_img_np = cls.convert_shared_arr(
config.RegNames.IMG_LABELS)
if cls.labels_edge is None:
cls.labels_edge = cls.convert_shared_arr(
config.RegNames.IMG_LABELS_EDGE)
if cls.dist_to_orig is None:
cls.dist_to_orig = cls.convert_shared_arr(
config.RegNames.IMG_LABELS_DIST)
if cls.labels_interior is None:
cls.labels_interior = cls.convert_shared_arr(
config.RegNames.IMG_LABELS_INTERIOR)
if cls.heat_map is None:
cls.heat_map = cls.convert_shared_arr(
config.RegNames.IMG_HEAT_MAP)
if cls.blobs is None:
cls.blobs = cls.convert_shared_arr(config.SUFFIX_BLOBS)
if cls.subseg is None:
cls.subseg = cls.convert_shared_arr(
config.RegNames.IMG_LABELS_SUBSEG)
if df is not None:
# set data frame of stats at finest level
cls.df = df
if spacing is not None:
# set image spacing
cls.spacing = spacing
# process basic metrics
#print("getting label metrics for {}".format(label_id))
_, count_metrics = cls.measure_counts(label_id)
_, var_metrics = cls.measure_variation(label_id)
_, edge_metrics = cls.measure_edge_dist(label_id)
metrics = {**count_metrics, **var_metrics, **edge_metrics}
if extra_metrics:
for extra_metric in extra_metrics:
# process additional metrics by applying corresponding function
fn = None
if extra_metric is config.MetricGroups.SHAPES:
fn = cls.measure_shapes
elif extra_metric is config.MetricGroups.POINT_CLOUD:
fn = cls.measure_point_cloud
if fn:
_, extra_metrics = fn(label_id)
metrics.update(extra_metrics)
return label_id, metrics
[docs]
@classmethod
def measure_counts(cls, label_ids):
"""Measure the distance between edge images.
If :attr:``df`` is available, it will be used to sum values
from labels in ``label_ids`` found in the data frame
rather than re-measuring values from images.
Args:
label_ids: Integer of the label or sequence of multiple labels
in :attr:``labels_img_np`` for which to measure variation.
Returns:
Tuple of the given label ID and a dictionary of metrics.
The metrics are NaN if the label size is 0.
"""
metrics = dict.fromkeys(cls._COUNT_METRICS, np.nan)
nuclei = np.nan
if cls.df is None:
# sum up counts within the collective region
label_mask = np.isin(cls.labels_img_np, label_ids)
label_size = np.sum(label_mask)
intens = np.sum(cls.atlas_img_np[label_mask]) # tot intensity
if cls.heat_map is not None:
nuclei = np.sum(cls.heat_map[label_mask])
else:
# get all rows associated with region and sum stats within columns
labels = cls.df.loc[
cls.df[LabelMetrics.Region.name].isin(label_ids)]
label_size = np.nansum(labels[LabelMetrics.Volume.name])
intens = np.nansum(labels[LabelMetrics.Intensity.name])
if LabelMetrics.Nuclei.name in labels:
nuclei = np.nansum(labels[LabelMetrics.Nuclei.name])
if label_size > 0:
metrics[LabelMetrics.Volume] = label_size
metrics[LabelMetrics.Intensity] = intens
metrics[LabelMetrics.Nuclei] = nuclei
disp_id = get_single_label(label_ids)
print("counts within label {}: {}"
.format(disp_id, libmag.enum_dict_aslist(metrics)))
return label_ids, metrics
[docs]
@classmethod
def region_props(cls, region, metrics, keys):
"""Measure properties for a region and add to a dictionary.
Args:
region: Region to measure, which can be a flattened array.
metrics: Dictionary to store metrics.
keys: Sequence of keys corresponding to standard deviation,
median, and Shannon Entropy measurements.
"""
if region.size < 1:
for key in keys: metrics[key] = np.nan
else:
#print(region.size, len(region))
metrics[keys[0]] = np.std(region)
metrics[keys[1]] = np.mean(region)
metrics[keys[2]] = np.median(region)
metrics[keys[3]], metrics[keys[4]] = np.percentile(region, (5, 95))
metrics[keys[5]] = measure.shannon_entropy(region)
[docs]
@classmethod
def measure_variation(cls, label_ids):
"""Measure the variation in underlying atlas intensity.
Variation is measured by standard deviation of atlas intensity and,
if :attr:``heat_map`` is available, that of the blob density.
If :attr:``df`` is available, it will be used to calculated
weighted averages from labels in ``label_ids`` found in the
data frame rather than re-measuring values from images.
Args:
label_ids: Integer of the label or sequence of multiple labels
in :attr:``labels_img_np`` for which to measure variation.
Returns:
Tuple of the given label ID and a dictionary a metrics.
The metrics are NaN if the label size is 0.
"""
metrics = dict((key, []) for key in VAR_METRICS)
if not libmag.is_seq(label_ids): label_ids = [label_ids]
seg_ids = []
for label_id in label_ids:
# collect all sub-regions
if cls.subseg is not None:
# get sub-segmentations within region
label_mask = cls.labels_img_np == label_id
seg_ids.extend(np.unique(cls.subseg[label_mask]).tolist())
else:
seg_ids.append(label_id)
if cls.df is None:
# calculate stats for each sub-segmentation; regional ("reg")
# means are weighted across regions and sub-segs, where the
# mean for each region which should equal total of full region
# if only one sub-seg
for seg_id in seg_ids:
if cls.subseg is not None:
seg_mask = cls.subseg == seg_id
else:
seg_mask = cls.labels_img_np == seg_id
size = np.sum(seg_mask)
if size > 0:
# variation in intensity of underlying atlas/sample region
vals = dict((key, np.nan) for key in VAR_METRICS)
vals[LabelMetrics.RegVolMean] = size
atlas_mask = cls.atlas_img_np[seg_mask]
cls.region_props(
atlas_mask, vals,
(LabelMetrics.VarIntensity,
LabelMetrics.MeanIntensity,
LabelMetrics.MedIntensity,
LabelMetrics.LowIntensity,
LabelMetrics.HighIntensity,
LabelMetrics.EntropyIntensity))
vals[LabelMetrics.CoefVarIntens] = (
vals[LabelMetrics.VarIntensity]
/ vals[LabelMetrics.MeanIntensity])
interior_mask = None
border_mask = None
if cls.labels_interior is not None:
# inner vs border variability
interior_mask = cls.labels_interior == seg_id
border_mask = np.logical_xor(seg_mask, interior_mask)
atlas_interior = cls.atlas_img_np[interior_mask]
atlas_border = cls.atlas_img_np[border_mask]
vals[LabelMetrics.VarIntensIn] = np.std(atlas_interior)
vals[LabelMetrics.VarIntensOut] = np.std(atlas_border)
# get variability interior-border match as abs diff
vals[LabelMetrics.VarIntensMatch] = abs(
vals[LabelMetrics.VarIntensOut]
- vals[LabelMetrics.VarIntensIn])
# get variability interior-border simple difference
vals[LabelMetrics.VarIntensDiff] = (
vals[LabelMetrics.VarIntensOut]
- vals[LabelMetrics.VarIntensIn])
if cls.heat_map is not None:
# number of blob and variation in blob density
blobs_per_px = cls.heat_map[seg_mask]
vals[LabelMetrics.VarNuclei] = np.std(blobs_per_px)
vals[LabelMetrics.RegNucMean] = np.sum(blobs_per_px)
vals[LabelMetrics.MeanNuclei] = np.mean(blobs_per_px)
if (interior_mask is not None and
border_mask is not None):
heat_interior = cls.heat_map[interior_mask]
heat_border = cls.heat_map[border_mask]
vals[LabelMetrics.VarNucIn] = np.std(heat_interior)
vals[LabelMetrics.VarNucOut] = np.std(heat_border)
vals[LabelMetrics.VarNucMatch] = abs(
vals[LabelMetrics.VarNucOut]
- vals[LabelMetrics.VarNucIn])
vals[LabelMetrics.CoefVarNuc] = (
vals[LabelMetrics.VarNuclei]
/ vals[LabelMetrics.MeanNuclei])
for metric in VAR_METRICS:
metrics[metric].append(vals[metric])
else:
# get sub-region stats stored in data frame
labels = cls.df.loc[cls.df[LabelMetrics.Region.name].isin(seg_ids)]
for i, row in labels.iterrows():
if row[LabelMetrics.RegVolMean.name] > 0:
for metric in VAR_METRICS:
if metric.name in row:
metrics[metric].append(row[metric.name])
else:
metrics[metric] = np.nan
# weighted average, with weights given by frac of region or
# sub-region size from total size
disp_id = get_single_label(label_ids)
vols = np.copy(metrics[LabelMetrics.RegVolMean])
tot_size = np.sum(vols) # assume no nans
nucs = np.copy(metrics[LabelMetrics.RegNucMean])
tot_nucs = np.nansum(nucs)
for key in metrics.keys():
#print("{} {}: {}".format(disp_id, key.name, metrics[key]))
if tot_size > 0 and metrics[key] != np.nan:
# take weighted mean
if key in NUC_METRICS:
# use weighting from nuclei for nuclei-oriented metrics
metrics[key] = np.nansum(
np.multiply(metrics[key], nucs)) / tot_nucs
else:
# default to weighting by volume
metrics[key] = np.nansum(
np.multiply(metrics[key], vols)) / tot_size
if tot_size <= 0 or metrics[key] == 0: metrics[key] = np.nan
print("variation within label {}: {}"
.format(disp_id, libmag.enum_dict_aslist(metrics)))
return label_ids, metrics
[docs]
@classmethod
def measure_edge_dist(cls, label_ids):
"""Measure the distance between edge images.
If :attr:``df`` is available, it will be used to calculated
a sum from edge distance sum or weighted averages from edge
distance mean values from labels in ``label_ids`` found in the
data frame rather than re-measuring values from images.
Args:
label_ids: Integer of the label or sequence of multiple labels
in :attr:``labels_img_np`` for which to measure variation.
Returns:
Tuple of the given label ID and dictionary of metrics.
The metrics are NaN if the label size is 0.
"""
metrics = dict.fromkeys(cls._EDGE_METRICS, np.nan)
# get collective region
label_mask = None
labels = None
if cls.df is None:
# get region directly from image
label_mask = np.isin(cls.labels_edge, label_ids)
label_size = np.sum(label_mask)
else:
# get all row associated with region
labels = cls.df.loc[
cls.df[LabelMetrics.Region.name].isin(label_ids)]
label_size = np.nansum(labels[LabelMetrics.Volume.name])
if label_size > 0:
if cls.df is None:
# sum and take average directly from image
region_dists = cls.dist_to_orig[label_mask]
metrics[LabelMetrics.EdgeDistSum] = np.sum(region_dists)
metrics[LabelMetrics.EdgeDistMean] = np.mean(region_dists)
metrics[LabelMetrics.EdgeSize] = region_dists.size
else:
# take sum from rows and weight means by edge sizes
if LabelMetrics.EdgeDistSum.name in labels:
metrics[LabelMetrics.EdgeDistSum] = np.nansum(
labels[LabelMetrics.EdgeDistSum.name])
if LabelMetrics.EdgeSize.name in labels:
sizes = labels[LabelMetrics.EdgeSize.name]
size = np.sum(sizes)
metrics[LabelMetrics.EdgeSize] = size
if LabelMetrics.EdgeDistMean.name in labels:
metrics[LabelMetrics.EdgeDistMean] = (
np.sum(np.multiply(
sizes, labels[LabelMetrics.EdgeDistMean.name]))
/ size)
disp_id = get_single_label(label_ids)
print("dist within edge of label {}: {}"
.format(disp_id, libmag.enum_dict_aslist(metrics)))
return label_ids, metrics
[docs]
@classmethod
def measure_shapes(cls, label_ids):
"""Measure label shapes.
Labels will be measured even if :attr:``df`` is available
to account for the global shape rather than using weighted-averages.
Args:
label_ids: Integer of the label or sequence of multiple labels
in :attr:``labels_img_np`` for which to measure shapes.
Returns:
Tuple of the given label ID and a dictionary of metrics.
"""
metrics = dict.fromkeys(cls._SHAPE_METRICS, np.nan)
# sum up counts within the collective region
label_mask = np.isin(cls.labels_img_np, label_ids)
label_size = np.sum(label_mask)
if label_size > 0:
compactness, area, _ = cv_nd.compactness_3d(
label_mask, cls.spacing)
metrics[LabelMetrics.SurfaceArea] = area
metrics[LabelMetrics.Compactness] = compactness
# TODO: high memory consumption with these measurements
# props = measure.regionprops(label_mask.astype(np.uint8))
# if props:
# prop = props[0]
# metrics[LabelMetrics.ConvexVolume] = prop.convex_area
# metrics[LabelMetrics.Solidity] = prop.solidity
props = None
disp_id = get_single_label(label_ids)
print("shape measurements of label {}: {}"
.format(disp_id, libmag.enum_dict_aslist(metrics)))
return label_ids, metrics
[docs]
@classmethod
def measure_point_cloud(cls, label_ids):
"""Measure point cloud statistics such as those from nuclei.
Assumes that the class attribute :attr:`blobs` is available.
Args:
label_ids: Integer of the label or sequence of multiple labels
in :attr:``labels_img_np`` for which to measure variation.
Returns:
Tuple of the given label ID and dictionary of metrics.
The metrics are NaN if the label size is 0.
"""
metrics = dict.fromkeys(cls._PCL_METRICS, np.nan)
if cls.df is None and cls.blobs is None:
print("data frame and blobs not available, unable to measure"
"point cloud stats")
return label_ids, metrics
# get collective region
labels = None
if cls.df is None:
# get region directly from image
label_mask = np.isin(cls.labels_img_np, label_ids)
label_size = np.sum(label_mask)
else:
# get all row associated with region
labels = cls.df.loc[
cls.df[LabelMetrics.Region.name].isin(label_ids)]
label_size = np.nansum(labels[LabelMetrics.Volume.name])
if label_size > 0:
if cls.df is None:
# sum and take average directly from image
blobs = cls.blobs[np.isin(cls.blobs[:, 3], label_ids)]
num_clusters, num_noise, num_largest = (
clustering.cluster_dbscan_metrics(blobs[:, 4]))
metrics[LabelMetrics.NucCluster] = num_clusters
metrics[LabelMetrics.NucClusNoise] = num_noise
metrics[LabelMetrics.NucClusLarg] = num_largest
else:
for key in metrics.keys():
if key.name not in labels: continue
metrics[key] = np.nansum(labels[key.name])
disp_id = get_single_label(label_ids)
print("nuclei clusters within label {}: {}"
.format(disp_id, libmag.enum_dict_aslist(metrics)))
return label_ids, metrics
[docs]
def get_single_label(label_id):
"""Get an ID as a single element.
Args:
label_id: Single ID or sequence of IDs.
Returns:
The first elements if ``label_id`` is a sequence, or the
``label_id`` itself if not.
"""
if libmag.is_seq(label_id) and len(label_id) > 0:
return label_id[0]
return label_id
def _update_df_side(df):
# invert label IDs of right-sided regions; assumes that using df
# will specify sides explicitly in label_ids
# TODO: consider removing combine_sides and using label_ids only
df.loc[df[config.AtlasMetrics.SIDE.value] == config.HemSides.RIGHT.value,
LabelMetrics.Region.name] *= -1
def _parse_vol_metrics(label_metrics, spacing=None, unit_factor=None,
extra_keys=None):
# parse volume metrics into physical units and nuclei density
physical_mult = None if spacing is None else np.prod(spacing)
keys = [LabelMetrics.Volume, LabelMetrics.VolAlt]
if extra_keys is not None:
keys.extend(extra_keys)
vols_phys = []
found_keys = []
for key in keys:
if key in label_metrics:
vols_phys.append(label_metrics[key])
found_keys.append(key)
label_size = vols_phys[0]
if physical_mult is not None:
# convert to physical units at the given value unless
# using data frame, where values presumably already converted
vols_phys = np.multiply(vols_phys, physical_mult)
if unit_factor is not None:
# further conversion to given unit size
unit_factor_vol = unit_factor ** len(spacing)
vols_phys = np.divide(vols_phys, unit_factor_vol)
if unit_factor is not None:
# convert metrics not extracted from data frame
if LabelMetrics.SurfaceArea in label_metrics:
# already incorporated physical units but needs to convert
# to unit size
label_metrics[LabelMetrics.SurfaceArea] /= unit_factor ** 2
# calculate densities based on physical volumes
for key, val in zip(found_keys, vols_phys):
label_metrics[key] = val
nuc = np.nan
if LabelMetrics.Nuclei in label_metrics:
nuc = label_metrics[LabelMetrics.Nuclei]
label_metrics[LabelMetrics.Density] = nuc / vols_phys[0]
return label_size, nuc, vols_phys
def _update_vol_dicts(label_id, label_metrics, grouping, metrics):
# parse volume metrics metadata into master metrics dictionary
side = ontology.get_label_side(label_id)
grouping[config.AtlasMetrics.SIDE.value] = side
disp_id = get_single_label(label_id)
label_metrics[LabelMetrics.Region] = abs(disp_id)
for key, val in grouping.items():
metrics.setdefault(key, []).append(val)
for col in LabelMetrics:
if col in label_metrics:
metrics.setdefault(col.name, []).append(label_metrics[col])
[docs]
def measure_labels_metrics(atlas_img_np, labels_img_np,
labels_edge, dist_to_orig, labels_interior=None,
heat_map=None, blobs=None,
subseg=None, spacing=None, unit_factor=None,
combine_sides=True, label_ids=None, grouping={},
df=None, extra_metrics=None):
"""Compute metrics such as variation and distances within regions
based on maps corresponding to labels image.
Args:
atlas_img_np: Atlas or sample image as a Numpy array.
labels_img_np: Integer labels image as a Numpy array.
labels_edge: Numpy array of labels reduced to their edges.
dist_to_orig: Distance map of labels to edges, with intensity values
in the same placement as in ``labels_edge``.
labels_interior: Numpy array of labels eroded to interior region.
heat_map: Numpy array as a density map; defaults to None to ignore
density measurements.
blobs (:obj:`np.ndarray`): 2D array of blobs; defaults to None.
subseg: Integer sub-segmentations labels image as Numpy array;
defaults to None to ignore label sub-divisions.
spacing: Sequence of image spacing for each pixel in the images.
unit_factor: Unit factor conversion; defaults to None. Eg use
1000 to convert from um to mm.
combine_sides: True to combine corresponding labels from opposite
sides of the sample; defaults to True. Corresponding labels
are assumed to have the same absolute numerical number and
differ only in signage. May be False if combining by passing
both pos/neg labels in ``label_ids``.
label_ids: Sequence of label IDs to include. Defaults to None,
in which case the labels will be taken from unique values
in ``labels_img_np``.
grouping: Dictionary of sample grouping metadata, where each
entry will be added as a separate column. Defaults to an
empty dictionary.
df: Data frame with rows for all drawn labels to pool into
parent labels instead of re-measuring stats for all
children of each parent; defaults to None.
extra_metrics (List[:obj:`config.MetricGroups`]): List of enums
specifying additional stats; defaults to None.
Returns:
Pandas data frame of the regions and weighted means for the metrics.
"""
start_time = time()
if df is None:
# convert to physical units based on spacing and unit conversion
vol_args = {"spacing": spacing, "unit_factor": unit_factor}
else:
# units already converted, but need to convert sides
_update_df_side(df)
vol_args = {}
is_fork = chunking.is_fork()
initializer = None
initargs = None
if is_fork:
# use a class to set and process the label without having to
# reference the labels image as a global variable in forked mode
MeasureLabel.set_data(
atlas_img_np, labels_img_np, labels_edge, dist_to_orig,
labels_interior, heat_map, blobs, subseg, df, spacing)
else:
# set up shared arrays for spawned mode
_logger.info("Initializing multiprocessing pool with images, which "
"may take awhile to complete")
initializer, initargs = MeasureLabel.build_pool_init({
config.RegNames.IMG_ATLAS: atlas_img_np,
config.RegNames.IMG_LABELS: labels_img_np,
config.RegNames.IMG_LABELS_EDGE: labels_edge,
config.RegNames.IMG_LABELS_DIST: dist_to_orig,
config.RegNames.IMG_LABELS_INTERIOR: labels_interior,
config.RegNames.IMG_HEAT_MAP: heat_map,
config.SUFFIX_BLOBS: blobs,
config.RegNames.IMG_LABELS_SUBSEG: subseg,
})
metrics = {}
grouping[config.AtlasMetrics.SIDE.value] = None
pool = chunking.get_mp_pool(initializer, initargs)
pool_results = []
if label_ids is None:
label_ids = np.unique(labels_img_np)
if combine_sides: label_ids = label_ids[label_ids >= 0]
for label_id in label_ids:
# include corresponding labels from opposite sides while skipping
# background
if label_id == 0: continue
if combine_sides: label_id = [label_id, -1 * label_id]
args = [label_id, extra_metrics]
if not is_fork:
args.append(df)
args.append(spacing)
pool_results.append(
pool.apply_async(MeasureLabel.label_metrics, args=args))
totals = {}
for result in pool_results:
# get metrics by label
label_id, label_metrics = result.get()
label_size, nuc, (vol_physical, vol_mean_physical) = _parse_vol_metrics(
label_metrics, extra_keys=(LabelMetrics.RegVolMean,), **vol_args)
reg_nuc_mean = label_metrics[LabelMetrics.RegNucMean]
edge_size = label_metrics[LabelMetrics.EdgeSize]
# calculate densities based on physical volumes
label_metrics[LabelMetrics.RegVolMean] = vol_mean_physical
label_metrics[LabelMetrics.RegDensityMean] = (
reg_nuc_mean / vol_mean_physical)
# transfer all found metrics to master dictionary
_update_vol_dicts(label_id, label_metrics, grouping, metrics)
# weight and accumulate total metrics
totals.setdefault(LabelMetrics.EdgeDistSum, []).append(
label_metrics[LabelMetrics.EdgeDistSum] * edge_size)
totals.setdefault(LabelMetrics.EdgeDistMean, []).append(
label_metrics[LabelMetrics.EdgeDistMean] * edge_size)
totals.setdefault(LabelMetrics.EdgeSize, []).append(edge_size)
totals.setdefault(LabelMetrics.VarIntensity, []).append(
label_metrics[LabelMetrics.VarIntensity] * label_size)
totals.setdefault("vol", []).append(label_size)
totals.setdefault(LabelMetrics.Volume, []).append(vol_physical)
totals.setdefault(LabelMetrics.RegVolMean, []).append(
vol_mean_physical * label_size)
var_nuc = label_metrics[LabelMetrics.VarNuclei]
if var_nuc != np.nan:
totals.setdefault(LabelMetrics.VarNuclei, []).append(
label_metrics[LabelMetrics.VarNuclei] * label_size)
totals.setdefault(LabelMetrics.Nuclei, []).append(nuc)
if reg_nuc_mean != np.nan:
totals.setdefault(LabelMetrics.RegNucMean, []).append(
reg_nuc_mean * label_size)
pool.close()
pool.join()
# make data frame of raw metrics, dropping columns of all NaNs
df = pd.DataFrame(metrics)
df = df.dropna(axis=1, how="all")
df_io.print_data_frame(df)
# build data frame of total metrics from weighted means
metrics_all = {}
grouping[config.AtlasMetrics.SIDE.value] = "both"
for key, val in grouping.items():
metrics_all.setdefault(key, []).append(val)
for key in totals.keys():
totals[key] = np.nansum(totals[key])
if totals[key] == 0: totals[key] = np.nan
# divide weighted values by sum of corresponding weights
totals[LabelMetrics.Region] = "all"
totals[LabelMetrics.RegVolMean] /= totals["vol"]
if LabelMetrics.Nuclei in totals:
totals[LabelMetrics.Density] = (
totals[LabelMetrics.Nuclei] / totals[LabelMetrics.Volume])
if LabelMetrics.RegNucMean in totals:
totals[LabelMetrics.RegNucMean] /= totals["vol"]
totals[LabelMetrics.RegDensityMean] = (
totals[LabelMetrics.RegNucMean] / totals[LabelMetrics.RegVolMean])
if LabelMetrics.VarNuclei in totals:
totals[LabelMetrics.VarNuclei] /= totals["vol"]
totals[LabelMetrics.VarIntensity] /= totals["vol"]
totals[LabelMetrics.EdgeDistMean] /= totals[LabelMetrics.EdgeSize]
for col in LabelMetrics:
if col in totals:
metrics_all.setdefault(col.name, []).append(totals[col])
df_all = pd.DataFrame(metrics_all)
df_io.print_data_frame(df_all)
print("time elapsed to measure variation:", time() - start_time)
return df, df_all
[docs]
class MeasureLabelOverlap(object):
"""Measure metrics comparing two versions of image labels in a way
that allows multiprocessing without global variables.
All images should be of the same shape. If :attr:``df`` is available,
it will be used in place of underlying images. Typically this
data frame contains metrics for labels only at the lowest level,
such as drawn or non-overlapping labels. These labels can then be
used to aggregate values through summation or weighted means to
generate metrics for superseding labels that contains these
individual labels.
Attributes:
labels_imgs: Sequence of integer labels image as Numpy arrays.
heat_map: Numpy array as a density map; defaults to None to ignore
density measurements.
df: Pandas data frame with a row for each sub-region.
"""
_OVERLAP_METRICS = (
LabelMetrics.Volume,
LabelMetrics.VolPx,
LabelMetrics.VolAlt,
LabelMetrics.VolAltPx,
LabelMetrics.Nuclei,
LabelMetrics.VolDSC,
LabelMetrics.NucDSC,
LabelMetrics.VolOut,
LabelMetrics.NucOut,
)
# images and data frame
labels_imgs = None
heat_map = None
df = None
[docs]
@classmethod
def set_data(cls, labels_imgs, heat_map=None, df=None):
"""Set the images and data frame."""
cls.labels_imgs = labels_imgs
cls.heat_map = heat_map
cls.df = df
[docs]
@classmethod
def measure_overlap(cls, label_ids):
"""Measure the overlap between image labels.
If :attr:``df`` is available, it will be used to sum values
from labels in ``label_ids`` found in the data frame
rather than re-measuring values from images.
Args:
label_ids: Integer of the label or sequence of multiple labels
in :attr:``labels_img_np`` for which to measure variation.
Returns:
Tuple of the given label ID and a dictionary of metrics.
The metrics are NaN if the label size is 0.
"""
metrics = dict.fromkeys(cls._OVERLAP_METRICS, np.nan)
nuclei = np.nan
nuc_dsc = np.nan
nuc_out = np.nan
if cls.df is None:
# find DSC between original and updated versions of the
# collective region
label_masks = [np.isin(l, label_ids) for l in cls.labels_imgs]
label_vol = np.sum(label_masks[0])
label_vol_alt = np.sum(label_masks[1])
vol_dsc = atlas_stats.meas_dice(label_masks[0], label_masks[1])
# sum up volume and nuclei count in the new version outside of
# the original version; assume that remaining original volume
# will be accounted for by the other labels that reoccupy it
mask_out = np.logical_and(label_masks[1], ~label_masks[0])
vol_out = np.sum(mask_out)
if cls.heat_map is not None:
nuclei = np.sum(cls.heat_map[label_masks[0]])
nuc_dsc = atlas_stats.meas_dice(
label_masks[0], label_masks[1], cls.heat_map)
nuc_out = np.sum(cls.heat_map[mask_out])
else:
# get weighted average of DSCs from all rows in a super-region,
# assuming all rows are at the lowest hierarchical level
labels = cls.df.loc[
cls.df[LabelMetrics.Region.name].isin(label_ids)]
label_vols = labels[LabelMetrics.Volume.name]
label_vol = np.nansum(label_vols)
label_vol_alt = np.nansum(labels[LabelMetrics.VolAlt.name])
vol_dscs = labels[LabelMetrics.VolDSC.name]
vol_dsc = df_io.weight_mean(vol_dscs, label_vols)
# sum up volume and nuclei outside of original regions
vol_out = np.nansum(labels[LabelMetrics.VolOut.name])
if LabelMetrics.Nuclei.name in labels:
nucs = labels[LabelMetrics.Nuclei.name]
nuclei = np.nansum(nucs)
nuc_dscs = labels[LabelMetrics.NucDSC.name]
nuc_dsc = df_io.weight_mean(nuc_dscs, nucs)
nuc_out = np.nansum(labels[LabelMetrics.NucOut.name])
if label_vol > 0:
# update dict with metric values; px vals will not get converted
# to physical units
metrics[LabelMetrics.Volume] = label_vol
metrics[LabelMetrics.VolPx] = label_vol
metrics[LabelMetrics.VolAlt] = label_vol_alt
metrics[LabelMetrics.VolAltPx] = label_vol_alt
metrics[LabelMetrics.Nuclei] = nuclei
metrics[LabelMetrics.VolDSC] = vol_dsc
metrics[LabelMetrics.NucDSC] = nuc_dsc
metrics[LabelMetrics.VolOut] = vol_out
metrics[LabelMetrics.NucOut] = nuc_out
disp_id = get_single_label(label_ids)
print("overlaps within label {}: {}"
.format(disp_id, libmag.enum_dict_aslist(metrics)))
return label_ids, metrics
[docs]
def measure_labels_overlap(labels_imgs, heat_map=None, spacing=None,
unit_factor=None, combine_sides=True,
label_ids=None, grouping={}, df=None):
"""Compute metrics comparing two version of atlas labels.
Args:
labels_imgs: Sequence of integer labels image as Numpy arrays.
heat_map: Numpy array as a density map; defaults to None to ignore
density measurements.
spacing: Sequence of image spacing for each pixel in the images.
unit_factor: Unit factor conversion; defaults to None. Eg use
1000 to convert from um to mm.
combine_sides: True to combine corresponding labels from opposite
sides of the sample; defaults to True. Corresponding labels
are assumed to have the same absolute numerical number and
differ only in signage. May be False if combining by passing
both pos/neg labels in ``label_ids``.
label_ids: Sequence of label IDs to include. Defaults to None,
in which case the labels will be taken from unique values
in ``labels_img_np``.
grouping: Dictionary of sample grouping metadata, where each
entry will be added as a separate column. Defaults to an
empty dictionary.
df: Data frame with rows for all drawn labels to pool into
parent labels instead of re-measuring stats for all
children of each parent; defaults to None.
Returns:
:obj:`pd.DataFrame`: Pandas data frame of the regions and weighted
means for the metrics.
"""
start_time = time()
if df is None:
vol_args = {"spacing": spacing, "unit_factor": unit_factor}
else:
_update_df_side(df)
vol_args = {}
# use a class to set and process the label without having to
# reference the labels image as a global variable
MeasureLabelOverlap.set_data(labels_imgs, heat_map, df)
metrics = {}
grouping[config.AtlasMetrics.SIDE.value] = None
pool = chunking.get_mp_pool()
pool_results = []
for label_id in label_ids:
# include corresponding labels from opposite sides while skipping
# background
if label_id == 0: continue
if combine_sides: label_id = [label_id, -1 * label_id]
pool_results.append(
pool.apply_async(
MeasureLabelOverlap.measure_overlap, args=(label_id,)))
for result in pool_results:
# get metrics by label
label_id, label_metrics = result.get()
label_size, nuc, _ = _parse_vol_metrics(
label_metrics, extra_keys=(LabelMetrics.VolOut,), **vol_args)
# transfer all found metrics to master dictionary
_update_vol_dicts(label_id, label_metrics, grouping, metrics)
pool.close()
pool.join()
# make data frame of raw metrics, dropping columns of all NaNs
df = pd.DataFrame(metrics)
df = df.dropna(axis=1, how="all")
df_io.print_data_frame(df)
print("time elapsed to measure variation:", time() - start_time)
return df
[docs]
def map_meas_to_labels(
labels_img: np.ndarray, df: pd.DataFrame, meas: str,
fn_avg: Callable[[Sequence], float], skip_nans: bool = False,
reverse: bool = False, col_wt: Optional[str] = None
) -> Optional[np.ndarray]:
"""Generate a map of a given measurement on a labels image.
The intensity values of labels will be replaced by the given metric
of the chosen measurement, such as the mean of the densities. If
multiple conditions exist, the difference of metrics for the first
two conditions will be taken under the assumption that the values for
each condition are in matching order.
Args:
labels_img: Labels image as a Numpy array in x,y,z.
df: Pandas data frame with measurements by regions corresponding
to that of ``labels_img``.
meas: Name of column in ``df`` from which to extract measurements.
fn_avg: Function to apply to the column for each region. If None,
``df`` is assumed to already contain statistics generated from
the ``clrstats`` R package, which will be extracted directly.
skip_nans: True to skip any region with NaNs, leaving 0 instead;
defaults to False to allow NaNs in resulting image. Some
applications may not be able to read NaNs, so this parameter
allows giving a neutral value instead.
reverse: Reverse the order of sorted conditions when generating
stats by ``fn_avg`` to compare conditions; defaults to False.
col_wt: Name of column to use for weighting, where the
magnitude of ``meas`` will be adjusted as fractions of the max
value in this weighting column for labels found in ``labels_img``;
defaults to None.
Retunrs:
A map of averages for the given measurement as an image of the
same shape as ``labels_img`` of float data type, or None if no
values for ``meas`` are found.
"""
if meas not in df or np.all(np.isnan(df[meas])):
# ensure that measurement column is present with non-NaNs
print("{} not in data frame or all NaNs, no image to generate"
.format(meas))
return None
# make image array to map differences for each label and filter data
# frame to get only these regions
labels_diff = np.zeros_like(labels_img, dtype=float)
labels_img_abs = np.abs(labels_img)
regions = np.unique(labels_img_abs)
df = df.loc[df["Region"].isin(regions)].copy()
df_cond = None
conds = config.plot_labels[config.PlotLabels.CONDITION]
if conds is None and "Condition" in df:
# get and sort conditions
df_cond = df["Condition"]
conds = sorted(np.unique(df_cond), reverse=reverse)
if col_wt is not None and len(df) > 0:
# weight given column for the first condition and normalizing it to
# its maximum value, or use the whole column if no conditions exist
print("weighting stats by", col_wt)
wts = df.loc[df_cond == conds[0], col_wt] if conds else df[col_wt]
wts /= max(wts)
if conds:
for cond in conds:
# use raw values to avoid multiplying by index; assumes
# matching order of values between conditions
df.loc[df_cond == cond, meas] = np.multiply(
df.loc[df_cond == cond, meas].values, wts.values)
else:
df.loc[:, meas] *= wts
for region in regions:
# get difference for each region, either from a single column
# that already has the difference of effect size of by taking
# the difference from two columns
df_region = df[df[LabelMetrics.Region.name] == region]
labels_region = labels_img_abs == region
diff = np.nan
if fn_avg is None:
# assume that df was output by R clrstats package
if df_region.shape[0] > 0:
diff = df_region[meas]
else:
if len(conds) >= 2:
# compare the metrics for the first two conditions
avgs = []
for cond in conds:
# gather separate metrics for each condition
df_region_cond = df_region[df_region["Condition"] == cond]
# print(df_region_cond)
reg_avg = fn_avg(df_region_cond[meas])
# print(region, cond, reg_avg)
avgs.append(reg_avg)
# TODO: consider making order customizable
diff = avgs[1] - avgs[0]
else:
# take the metric for the single condition
diff = fn_avg(df_region[meas])
if skip_nans and np.isnan(diff):
diff = 0
labels_diff[labels_region] = diff
print("label {} difference: {}".format(region, diff))
return labels_diff
[docs]
def labels_distance(
labels_img1: np.ndarray, labels_img2: np.ndarray,
spacing: Optional[Sequence[float]] = None,
out_path: Optional[str] = None,
name: Optional[str] = None) -> pd.DataFrame:
"""Measure distances between corresponding labels in two images.
Assumes that a 0 is background and will be skipped.
Args:
labels_img1: Labels image 1.
labels_img2: Labels image 2. Does not
have to be of the same shape as ``labels_img``, but assumed
to have the same origin/offset and ``spacing`` as distances
are based on centroid coordinates of the corresponding labels.
spacing: Spacing/scaling in ``z,y,x``; defaults to None.
out_path: CSV output path; defaults to None to not save.
name: Sample name; defaults to None.
Returns:
Data frame of output metrics.
"""
dists = []
# pool unique labels from both images
label_ids1 = np.unique(labels_img1)
label_ids2 = np.unique(labels_img2)
label_ids = np.unique(np.append(label_ids1, label_ids2))
imgs = (labels_img1, labels_img2)
for label_id in label_ids:
if label_id == 0: continue
if label_id in label_ids1 and label_id in label_ids2:
# compute distance between centroids of corresponding labels
# in both images, scaled by spacing if provided
centroids = [
cv_nd.get_label_props(m, label_id)[0].centroid for m in imgs]
centroids_scaled = centroids
if spacing is not None:
centroids = [np.multiply(c, spacing) for c in centroids]
dist = cdist(
np.array([centroids[0]]), np.array([centroids[1]]))[0][0]
else:
# label missing from at least one image
centroids = [np.nan] * 2
centroids_scaled = centroids
dist = np.nan
dists.append((
name, label_id, *centroids_scaled[:2], *centroids[:2], dist))
# export metrics to data frame
df = df_io.dict_to_data_frame(
dists, out_path, show=True, records_cols=(
config.AtlasMetrics.SAMPLE.value,
LabelMetrics.Region.name,
"Centroid1_px",
"Centroid2_px",
"Centroid1",
"Centroid2",
LabelMetrics.Dist.name))
return df
[docs]
def get_metric_weight_col(stat):
"""Get the weighting column for a given metric.
Args:
stat (str): The metric for which to find the appropriate weighting
metric.
Returns:
The name of the corresponding weighting metric as a string.
"""
col_wt = None
if stat in [metric.name for metric in WT_METRICS]:
col_wt = LabelMetrics.Volume.name
if stat in [metric.name for metric in NUC_METRICS]:
col_wt = LabelMetrics.Nuclei.name
return col_wt