Source code for magmap.stats.atlas_stats

# Atlas measurements and statistics
# Author: David Young, 2019, 2023
"""Low-level measurement of atlases and statistics generation.

Typically applied to specific types of atlases and less generalizable
than measurements in :module:`vols`.
"""
import os
from typing import Optional, Sequence

import numpy as np
import pandas as pd

from magmap.atlas import atlas_refiner, ontology
from magmap.io import df_io, export_stack, libmag, np_io, sitk_io
from magmap.plot import colormaps, plot_2d, plot_support
from magmap.settings import config
from magmap.stats import vols


[docs] def plot_region_development(metric, size=None, show=True): """Plot regions across development for the given metric. Args: metric (str): Column name of metric to track. size (List[int]): Sequence of ``width, height`` to size the figure; defaults to None. show (bool): True to display the image; defaults to True. """ # set up access to data frame columns id_cols = ["Age", "Condition"] extra_cols = ["RegionName"] cond_col = "Region" # assume that vol stats file is given first, then region IDs; # merge in region names and levels df_regions = pd.read_csv(config.filenames[1]) df = pd.read_csv(config.filename).merge( df_regions[["Region", "RegionName", "Level"]], on="Region", how="left") # convert sample names to ages ages = ontology.rel_to_abs_ages(df["Sample"].unique()) df["Age"] = df["Sample"].map(ages) # get large super-structures for normalization to brain tissue, where # "non-brain" are spinal cord and ventricles, which are variably labeled df_base = df[df["Region"] == 15564] ids_nonbr_large = (17651, 126651558) dfs_nonbr_large = [df[df["Region"] == n] for n in ids_nonbr_large] # get data frame with region IDs of all non-brain structures removed labels_ref_lookup = ontology.LabelsRef(config.load_labels).load().ref_lookup ids_nonbr = [] for n in ids_nonbr_large: ids_nonbr.extend( ontology.get_children_from_id(labels_ref_lookup, n)) label_id = config.atlas_labels[config.AtlasLabels.ID] if label_id is not None: # show only selected region and its children ids = ontology.get_children_from_id(labels_ref_lookup, label_id) df = df[np.isin(df["Region"], ids)] df_brain = df.loc[~df["Region"].isin(ids_nonbr)] levels = np.sort(df["Level"].unique()) conds = df["Condition"].unique() # get aggregated whole brain tissue for normalization cols_show = (*id_cols, cond_col, *extra_cols, metric) if dfs_nonbr_large: # add all large non-brain structures df_nonbr = dfs_nonbr_large[0] for df_out in dfs_nonbr_large[1:]: df_nonbr = df_io.normalize_df( df_nonbr, id_cols, cond_col, None, [metric], extra_cols, df_out, df_io.df_add) # subtract them from whole organism to get brain tissue alone, # updating given metric in db_base df_base = df_io.normalize_df( df_base, id_cols, cond_col, None, [metric], extra_cols, df_nonbr, df_io.df_subtract) df_base.loc[:, "RegionName"] = "Brain tissue" print("Brain {}:".format(metric)) df_io.print_data_frame(df_base.loc[:, cols_show], "\t") df_base_piv, regions = df_io.pivot_with_conditions( df_base, id_cols, "RegionName", metric) # plot lines with separate styles for each condition and colors for # each region name linestyles = ("--", "-.", ":", "-") num_conds = len(conds) linestyles = linestyles * (num_conds // (len(linestyles) + 1) + 1) if num_conds < len(linestyles): # ensure that 1st and last styles are dashed and solid unless linestyles = (*linestyles[:num_conds-1], linestyles[-1]) lines_params = { "labels": (metric, "Post-Conceptional Age"), "linestyles": linestyles, "size": size, "show": show, "ignore_invis": True, "groups": conds, "marker": ".", } line_params_norm = lines_params.copy() line_params_norm["labels"] = ("Fraction", "Post-Conceptional Age") plot_2d.plot_lines( config.filename, "Age", regions, title="Whole Brain Development ({})".format(metric), suffix="_dev_{}_brain".format(metric), df=df_base_piv, **lines_params) for level in levels: # plot raw metric at given level df_level = df.loc[df["Level"] == level] print("Raw {}:".format(metric)) df_io.print_data_frame(df_level.loc[:, cols_show], "\t") df_level_piv, regions = df_io.pivot_with_conditions( df_level, id_cols, "RegionName", metric) plot_2d.plot_lines( config.filename, "Age", regions, title="Structure Development ({}, Level {})".format( metric, level), suffix="_dev_{}_level{}".format(metric, level), df=df_level_piv, **lines_params) # plot metric normalized to whole brain tissue; structures # above removed regions will still contain them df_brain_level = df_brain.loc[df_brain["Level"] == level] df_norm = df_io.normalize_df( df_brain_level, id_cols, cond_col, None, [metric], extra_cols, df_base) print("{} normalized to whole brain:".format(metric)) df_io.print_data_frame(df_norm.loc[:, cols_show], "\t") df_norm_piv, regions = df_io.pivot_with_conditions( df_norm, id_cols, "RegionName", metric) plot_2d.plot_lines( config.filename, "Age", regions, units=(None, config.plot_labels[config.PlotLabels.X_UNIT]), title=("Structure Development Normalized to Whole " "Brain ({}, Level {})".format(metric, level)), suffix="_dev_{}_level{}_norm".format(metric, level), df=df_norm_piv, **line_params_norm)
[docs] def plot_unlabeled_hemisphere(path, cols, size=None, show=True): """Plot unlabeled hemisphere fractions as bar and line plots. Args: path (str): Path to data frame. cols (List[str]): Sequence of columns to plot. size (List[int]): Sequence of ``width, height`` to size the figure; defaults to None. show (bool): True to display the image; defaults to True. """ # load data frame and convert sample names to ages df = pd.read_csv(path) ages = ontology.rel_to_abs_ages(df["Sample"].unique()) df["Age"] = df["Sample"].map(ages) # generate a separate graph for each metric conds = df["Condition"].unique() for col in cols: title = "{}".format(col).replace("_", " ") y_label = "Fraction of hemisphere unlabeled" # plot as lines df_lines, regions = df_io.pivot_with_conditions( df, ["Age", "Condition"], "Region", col) plot_2d.plot_lines( config.filename, "Age", regions, linestyles=("--", "-"), labels=(y_label, "Post-Conceptional Age"), title=title, size=size, show=show, ignore_invis=True, suffix="_{}".format(col), df=df_lines, groups=conds) # plot as bars, pivoting value into separate columns by condition df_bars = df.pivot( index="Sample", columns="Condition", values=col).reset_index() plot_2d.plot_bars( config.filename, conds, col_groups="Sample", y_label=y_label, title=title, size=None, show=show, df=df_bars, prefix="{}_{}".format( os.path.splitext(config.filename)[0], col))
[docs] def meas_plot_zscores(path, metric_cols, extra_cols, composites, size=None, show=True): """Measure and plot z-scores for given columns in a data frame. Args: path (str): Path to data frame. metric_cols (List[str]): Sequence of column names for which to compute z-scores. extra_cols (List[str]): Additional columns to included in the output data frame. composites (List[Enum]): Sequence of enums specifying the combination, typically from :class:`vols.MetricCombos`. size (List[int]): Sequence of ``width, height`` to size the figure; defaults to None. show (bool): True to display the image; defaults to True. """ # generate z-scores df = pd.read_csv(path) df = df_io.zscore_df(df, "Region", metric_cols, extra_cols, True) # generate composite score column df_comb = df_io.combine_cols(df, composites) df_io.data_frames_to_csv( df_comb, libmag.insert_before_ext(config.filename, "_zhomogeneity")) # shift metrics from each condition to separate columns conds = np.unique(df["Condition"]) df = df_io.cond_to_cols_df( df, ["Sample", "Region"], "Condition", "original", metric_cols) path = libmag.insert_before_ext(config.filename, "_zscore") df_io.data_frames_to_csv(df, path) # display as probability plot lims = (-3, 3) plot_2d.plot_probability( path, conds, metric_cols, "Volume", xlim=lims, ylim=lims, title="Region Match Z-Scores", fig_size=size, show=show, suffix=None, df=df)
[docs] def meas_plot_coefvar(path, id_cols, cond_col, cond_base, metric_cols, composites, size_col=None, size=None, show=True): """Measure and plot coefficient of variation (CV) as a scatter plot. CV is computed two ways: - Based on columns and equation specified in ``composites``, applied across all samples regardless of group - For each metric in ``metric_cols``, separated by groups Args: path (str): Path to data frame. id_cols (List[str]): Sequence of columns to serve as index/indices. cond_col (str): Name of the condition column. cond_base (str): Name of the condition to which all other conditions will be normalized. metric_cols (List[str]): Sequence of column names for which to compute z-scores. composites (List[Enum]): Sequence of enums specifying the combination, typically from :class:`vols.MetricCombos`. size_col (str): Name of weighting column for coefficient of variation measurement; defaults to None. size (List[int]): Sequence of ``width, height`` to size the figure; defaults to None. show (bool): True to display the image; defaults to True. """ # measure coefficient of variation per sample-region regardless of group df = pd.read_csv(path) df = df_io.combine_cols(df, composites) df_io.data_frames_to_csv( df, libmag.insert_before_ext(config.filename, "_coefvar")) # measure CV within each condition and shift metrics from each # condition to separate columns df = df_io.coefvar_df(df, [*id_cols, cond_col], metric_cols, size_col) conds = np.unique(df[cond_col]) df = df_io.cond_to_cols_df(df, id_cols, cond_col, cond_base, metric_cols) path = libmag.insert_before_ext(config.filename, "_coefvartransp") df_io.data_frames_to_csv(df, path) # display CV measured by condition as probability plot lims = (0, 0.7) plot_2d.plot_probability( path, conds, metric_cols, "Volume", xlim=lims, ylim=lims, title="Coefficient of Variation", fig_size=size, show=show, suffix=None, df=df)
[docs] def smoothing_peak(df, thresh_label_loss=None, filter_size=None): """Extract the baseline and peak smoothing quality rows from the given data frame matching the given criteria. Args: df: Data frame from which to extract. thresh_label_loss: Only check rows below or equal to this fraction of label loss; defaults to None to ignore. filter_size: Only rows with the given filter size; defaults to None to ignore. Returns: New data frame with the baseline (filter size of 0) row and the row having the peak smoothing quality meeting criteria. """ if thresh_label_loss is not None: df = df.loc[ df[config.SmoothingMetrics.LABEL_LOSS.value] <= thresh_label_loss] if filter_size is not None: df = df.loc[np.isin( df[config.SmoothingMetrics.FILTER_SIZE.value], (filter_size, 0))] sm_qual = df[config.SmoothingMetrics.SM_QUALITY.value] df_peak = df.loc[np.logical_or( sm_qual == sm_qual.max(), df[config.SmoothingMetrics.FILTER_SIZE.value] == 0)] return df_peak
[docs] def plot_intensity_nuclei(paths, labels, size=None, show=True, unit=None): """Plot nuclei vs. intensity as a scatter plot. Args: paths (List[str]): Sequence of paths to CSV files. labels (List[str]): Sequence of label metrics corresponding to ``paths``. size (List[int]): Sequence of ``width, height`` to size the figure; defaults to None. show (bool): True to display the image; defaults to True. unit (str): Denominator unit for density plot; defaults to None. Returns: :obj:`pd.DataFrame`: Data frame with columns matching ``labels`` for the given ``paths`` concatenated. """ def plot(lbls, suffix=None, unit=None): cols_xy = [] for label in lbls: # get columns for the given label to plot on a given axis; assume # same order of labels for each group of columns so they correspond cols_xy.append([ c for c in df.columns if c.split(".")[0] == label]) names_group = None if cols_xy: # extract legend names assuming label.cond format names_group = np.unique([c.split(".")[1] for c in cols_xy[0]]) units = (["{}/{}".format(l.split("_")[0], unit) for l in lbls] if unit else (None, None)) lbls = [l.replace("_", " ") for l in lbls] title = "{} Vs. {} By Region".format(*lbls) plot_2d.plot_scatter( "vols_stats_intensVnuc", cols_xy[0], cols_xy[1], yunit=units[0], xunit=units[1], ylabel=lbls[0], xlabel=lbls[1], title=title, # col_annot=config.AtlasMetrics.REGION_ABBR.value, names_group=names_group, fig_size=size, show=show, suffix=suffix, df=df) if len(paths) < 2 or len(labels) < 2: return dfs = [pd.read_csv(path) for path in paths] # merge data frames with all columns ending with .mean, prepending labels extra_cols = [ config.AtlasMetrics.REGION.value, config.AtlasMetrics.REGION_ABBR.value, vols.LabelMetrics.Volume.name, ] tag = ".mean" df = df_io.append_cols( dfs[:2], labels, lambda x: x.lower().endswith(tag), extra_cols) dens = "{}_density" for col in df.columns: if col.startswith(labels): col_split = col.split(".") col_split[0] = dens.format(col_split[0]) df.loc[:, ".".join(col_split)] = ( df[col] / df[vols.LabelMetrics.Volume.name]) # strip the tag from column names names = {col: col.rsplit(tag)[0] for col in df.columns} df = df.rename(columns=names) df_io.data_frames_to_csv(df, "vols_stats_intensVnuc.csv") # plot labels and density labels plot(labels) plot([dens.format(l) for l in labels], "_density", unit) return df
[docs] def meas_improvement(path, col_effect, col_p, thresh_impr=0, thresh_p=0.05, col_wt=None, suffix=None, df=None): """Measure overall improvement and worsening for a column in a data frame. Args: path (str): Path of file to load into data frame. col_effect (str): Name of column with metric to measure. col_p (str): Name of column with p-values. thresh_impr (float): Threshold of effects below which are considered improved. thresh_p (float): Threshold of p-values below which are considered statistically significant. col_wt (str): Name of column for weighting. suffix (str): Output path suffix; defaults to None. df (:obj:`pd.DataFrame`): Data fram to use instead of loading from ``path``; defaults to None. Returns: :obj:`pd.DataFrame`: Data frame with improvement measurements. The data frame will be saved to a filename based on ``path``. """ def add_wt(mask_cond, mask_cond_ss, name): # add weighted metrics for the given condition, such as improved # vs. worsened metrics[col_wt] = [np.sum(df[col_wt])] wt_cond = df.loc[mask_cond, col_wt] wt_cond_ss = df.loc[mask_cond_ss, col_wt] # sum of weighting column fitting the condition (all and statistically # significant) metrics["{}_{}".format(col_wt, name)] = [np.sum(wt_cond)] metrics["{}_{}_ss".format(col_wt, name)] = [np.sum(wt_cond_ss)] # sum of filtered effect multiplied by weighting metrics["{}_{}_by_{}".format(col_effect, name, col_wt)] = [np.sum( wt_cond.multiply(df.loc[mask_cond, col_effect]))] metrics["{}_{}_by_{}_ss".format(col_effect, name, col_wt)] = [np.sum( wt_cond_ss.multiply(df.loc[mask_cond_ss, col_effect]))] if df is None: df = pd.read_csv(path) # masks of improved and worsened, all and statistically significant # for each, where improvement is above the given threshold effects = df[col_effect] mask_impr = effects > thresh_impr mask_ss = df[col_p] < thresh_p mask_impr_ss = mask_impr & mask_ss mask_wors = effects < thresh_impr mask_wors_ss = mask_wors & mask_ss metrics = { "n": [len(effects)], "n_impr": [np.sum(mask_impr)], "n_impr_ss": [np.sum(mask_impr_ss)], "n_wors": [np.sum(mask_wors)], "n_wors_ss": [np.sum(mask_wors_ss)], col_effect: [np.sum(effects)], "{}_impr".format(col_effect): [np.sum(effects[mask_impr])], "{}_impr_ss".format(col_effect): [np.sum(effects[mask_impr_ss])], "{}_wors".format(col_effect): [np.sum(effects[mask_wors])], "{}_wors_ss".format(col_effect): [np.sum(effects[mask_wors_ss])], } if col_wt: # add columns based on weighting column add_wt(mask_impr, mask_impr_ss, "impr") add_wt(mask_wors, mask_wors_ss, "wors") out_path = libmag.insert_before_ext(path, "_impr") if suffix: out_path = libmag.insert_before_ext(out_path, suffix) df_impr = df_io.dict_to_data_frame(metrics, out_path) # display transposed version for more compact view given large number # of columns, but save un-transposed to preserve data types df_io.print_data_frame(df_impr.T, index=True, header=False) return df_impr
[docs] def plot_clusters_by_label(path, z, suffix=None, show=True, scaling=None): """Plot separate sets of clusters for each label. Args: path (str): Base path to blobs file with clusters. z (int): z-plane to plot. suffix (str): Suffix for ``path``; defaults to None. show (bool): True to show; defaults to True. scaling (List): Sequence of scaling from blobs' coordinate space to that of :attr:`config.labels_img`. """ mod_path = path if suffix is not None: mod_path = libmag.insert_before_ext(path, suffix) blobs = np.load(libmag.combine_paths( mod_path, config.SUFFIX_BLOB_CLUSTERS)) label_ids = np.unique(blobs[:, 3]) fig, gs = plot_support.setup_fig( 1, 1, config.plot_labels[config.PlotLabels.SIZE]) ax = fig.add_subplot(gs[0, 0]) plot_support.hide_axes(ax) # plot underlying atlas np_io.setup_images(mod_path) if config.reg_suffixes[config.RegSuffixes.ATLAS]: # use atlas if explicitly set img = config.image5d else: # default to black background img = np.zeros_like(config.labels_img)[None] stacker = export_stack.setup_stack( img, mod_path, slice_vals=(z, z + 1), labels_imgs=(config.labels_img, config.borders_img)) stacker.build_stack( ax, config.plot_labels[config.PlotLabels.SCALE_BAR]) # export_stack.reg_planes_to_img( # (np.zeros(config.labels_img.shape[1:], dtype=int), # config.labels_img[z]), ax=ax) if scaling is not None: print("scaling blobs cluster coordinates by", scaling) blobs = blobs.astype(float) blobs[:, :3] = np.multiply(blobs[:, :3], scaling) blobs[:, 0] = np.floor(blobs[:, 0]) # plot nuclei by label, colored based on cluster size within each label colors = colormaps.discrete_colormap( len(np.unique(blobs[:, 4])), prioritize_default="cn") / 255. col_noise = (1, 1, 1, 1) for label_id in label_ids: if label_id == 0: # skip blobs in background continue # sort blobs within label by cluster size (descending order), # including clusters within all z-planes to keep same order across zs blobs_lbl = blobs[blobs[:, 3] == label_id] clus_lbls, clus_lbls_counts = np.unique( blobs_lbl[:, 4], return_counts=True) clus_lbls = clus_lbls[np.argsort(clus_lbls_counts)][::-1] blobs_lbl = blobs_lbl[blobs_lbl[:, 0] == z] for i, (clus_lbl, color) in enumerate(zip(clus_lbls, colors)): blobs_clus = blobs_lbl[blobs_lbl[:, 4] == clus_lbl] if len(blobs_clus) < 1: continue # default to small, translucent dominant cluster points size = 0.1 alpha = 0.5 if clus_lbl == -1: # color all noise points the same and emphasize points color = col_noise size = 0.5 alpha = 1 print(label_id, clus_lbl, color, len(blobs_clus)) ax.scatter( blobs_clus[:, 2], blobs_clus[:, 1], color=color, s=size, alpha=alpha) plot_support.save_fig(mod_path, config.savefig, "_clusplot") if show: plot_support.show()
[docs] def meas_landmark_dist( paths: Sequence[str], spacing: Optional[Sequence[float]] = None ) -> Optional[pd.DataFrame]: """Measure distance between corresponding labels in two labels images. Supports image transposition of the 2nd image by :meth:`magmap.atlas.atlas_refiner` with settings from :attr:`magmap.settings.config.transforms`. Args: paths: Sequence of paths to image files loadable by SimpleITK. spacing: Spacing/scaling in ``z,y,x``; defaults to None. Returns: Data frame of output metrics. """ if len(paths) < 2: print("Please provide paths to 2 labels images") return None labels_imgs = [ sitk_io.read_sitk_files(p, return_sitk=True)[1] for p in paths[:2]] if spacing is None and len(config.resolutions) > 0: # default to using loaded metadata spacing = config.resolutions[0] # transpose 2nd image with any config settings labels_imgs[1] = atlas_refiner.transpose_img(labels_imgs[1]) labels_imgs = [sitk_io.convert_img(m) for m in labels_imgs] # set up output path and sample name based on 1st image's path out_path = libmag.make_out_path(libmag.combine_paths( paths[0], "labelsdist.csv")) sample = libmag.get_filename_without_ext(paths[0]) # measure distance df = vols.labels_distance( labels_imgs[0], labels_imgs[1], spacing, out_path, sample) return df
[docs] def meas_dice(mask1, mask2, img=None): """Measure Dice Similarity Coefficient (DSC) between two images. Args: mask1 (:obj:`np.ndarray`): Mask of first image. mask2 (:obj:`np.ndarray`): Mask of second image with same shape as that of ``mask2``. img (:obj:`np.ndarray`): Intensity image whose values within each mask will be summed, of the same shape as that of the masks; defaults to None. Returns: :float: DSC between the two images, either based directly on the mask volumes or weighted by intensities of ``img`` if given. """ union = np.logical_and(mask1, mask2) if img is None: # use direct volume of masks out = (mask1, mask2) else: # weight volumes by underlying intensity union = img[union] out = (img[mask1], img[mask2]) denom = np.sum([np.sum(o) for o in out]) dsc = np.nan if denom == 0 else 2.0 * np.sum(union) / denom return dsc
[docs] def calc_sens_ppv(pos, true_pos, false_pos, false_neg): """Calculate sensitivity and positive predictive value (PPV), typically for assessing detection accuracy. Args: pos (int): Number of positives. true_pos (int): Number of correct detections. false_pos (int): Number of incorrect detections. false_neg (int): Number of missed detections. Returns: float, float, str: Sensitivity, PPV, and summary string. """ sens = float(true_pos) / pos if pos > 0 else np.nan all_pos = true_pos + false_pos ppv = float(true_pos) / all_pos if all_pos > 0 else np.nan msg = ("objects: {}\ndetected objects: {}\n" "false pos: {}\nfalse neg: {}\nsensitivity: {}\n" "PPV: {}\n".format(pos, true_pos, false_pos, false_neg, sens, ppv)) return sens, ppv, msg