Source code for magmap.io.export_regions

# Region labels export to data frames and CSV files
# Author: David Young, 2019, 2023
"""Region labels export to data frames and CSV files.

Convert regions from ontology files or atlases to data frames.
"""
import itertools
import os
import csv
from collections import OrderedDict
from time import time
from typing import Callable, Dict, Optional, Sequence, Tuple

try:
    import SimpleITK as sitk
except ImportError:
    sitk = None
import numpy as np
import pandas as pd

from magmap.io import libmag
from magmap.io import np_io
from magmap.atlas import labels_meta, ontology
from magmap.cv import chunking, colocalizer, cv_nd, detector
from magmap.io import df_io, sitk_io, sqlite
from magmap.plot import colormaps
from magmap.settings import config, atlas_prof
from magmap.stats import vols

_logger = config.logger.getChild(__name__)


[docs] def export_region_ids( labels_ref_lookup: Dict, path: str, level: Optional[int] = None, drawn_labels_only: bool = False) -> pd.DataFrame: """Export region IDs from annotation reference reverse mapped dictionary. Args: labels_ref_lookup: The labels reference lookup, assumed to be an OrderedDict generated by :func:`ontology.create_reverse_lookup` to look up by ID while preserving key order to ensure that parents of any child will be reached prior to the child. path: Path to output CSV file; if does not end with ``.csv``, it will be added. level: Level at which to find parent for each label. If None, each the immediate parent will be retrieved for each label. drawn_labels_only: True to export only the drawn labels for atlas labels in the same folder as ``labels_ref_lookup``. The RGB values used for the currently loaded atlas will also be shown in a corresponding Excel file (requires `openpyxl` and `Jinja2` packages). If False, the full set of labels in ``labels_ref_lookup`` is used. Returns: Pandas data frame of the region IDs and corresponding names. """ def color_cells(s): # convert RGB to hex values since Pandas Excel export only supports # named colors or hex (as of v0.22) css = ["background-color: #{:02x}{:02x}{:02x}".format(*c) for c in s] return css ext = ".csv" path_csv = path if path.endswith(ext) else path + ext # find ancestor for each label at the given level label_parents = ontology.labels_to_parent( labels_ref_lookup, level, allow_parent_same_level=True) # load labels cols = [config.AtlasMetrics.REGION.value, config.AtlasMetrics.REGION_ABBR.value, config.AtlasMetrics.REGION_NAME.value, config.AtlasMetrics.LEVEL.value, config.AtlasMetrics.PARENT.value] data = OrderedDict() label_ids = sitk_io.find_atlas_labels( config.load_labels, drawn_labels_only, labels_ref_lookup) # set up label image colormap for RGB values using original labels, # which are only available if showing drawn labels; use same settings # as in image setup, including duplicate set of colors for neg values # since the color randomization depends on the exact number of colors cm = colormaps.setup_labels_cmap(None, use_orig_labels=True) rgbs = cm.cmap_labels if rgbs is not None: # remove colors for duplicated labels, keeping second half rgbs = rgbs[len(rgbs)//2:] cols.append("RGB") for i, key in enumerate(label_ids): # get label dict label = labels_ref_lookup.get(key) if label is None: continue # ID of parent at label_parents' level parent = label_parents[key] vals = [key, label[ontology.NODE][config.ABAKeys.ACRONYM.value], label[ontology.NODE][config.ABAKeys.NAME.value], label[ontology.NODE][config.ABAKeys.LEVEL.value], parent] if rgbs is not None: vals.append(rgbs[i, :3]) for col, val in zip(cols, vals): data.setdefault(col, []).append(val) df = df_io.dict_to_data_frame(data, path_csv) if rgbs is not None: try: # color cells in RGB column df = df.style.apply(color_cells, subset="RGB") except ImportError: _logger.warn( "Skipping RGB coloring. %s", config.format_import_err( "Jinja2", task="coloring table cells")) path_xlsx = "{}.xlsx".format(os.path.splitext(path)[0]) try: # export Excel file, necessary for styling df.to_excel(path_xlsx) _logger.info("Exported regions to styled spreadsheet: %s", path_xlsx) except ModuleNotFoundError: _logger.warn( "Skipping Excel export. %s", config.format_import_err( "openpyxl", task="formatting Excel files")) return df
[docs] def export_region_network(labels_ref_lookup, path): """Export region network file showing relationships among regions according to the SIF specification. See http://manual.cytoscape.org/en/stable/Supported_Network_File_Formats.html#sif-format for file format information. Args: labels_ref_lookup: The labels reference lookup, assumed to be an OrderedDict generated by :func:`ontology.create_reverse_lookup` to look up by ID while preserving key order to ensure that parents of any child will be reached prior to the child. path: Path to output SIF file; if does not end with ``.sif``, it will be added. """ ext = ".sif" if not path.endswith(ext): path += ext network = {} for key in labels_ref_lookup.keys(): if key < 0: continue # only use original, non-neg region IDs label = labels_ref_lookup[key] parents = label.get(ontology.PARENT_IDS) if parents: for parent in parents[::-1]: # work backward since closest parent listed last #print("{} looking for parent {} in network".format(key, parent)) network_parent = network.get(parent) if network_parent is not None: # assume that all parents will have already been entered # into the network dict since the keys were entered in # hierarchical order and maintain their order of entry network_parent.append(key) break # all regions have a node, even if connected to no one network[key] = [] with open(path, "w", newline="") as csv_file: stats_writer = csv.writer(csv_file, delimiter=" ") # each region will have a line along with any of its immediate children for key in network.keys(): children = network[key] row = [str(key)] if children: row.extend(["pp", *children]) stats_writer.writerow(row) print("exported region network: \"{}\"".format(path))
[docs] def export_common_labels(img_paths, output_path): """Export data frame combining all label IDs from the given atlases, showing the presence of labels in each atlas. Args: img_paths: Image paths from which to load the corresponding labels images. output_path: Path to export data frame to .csv. Returns: Data frame with label IDs as indices, column for each atlas, and cells where 1 indicates that the given atlas has the corresponding label. """ labels_dict = {} for img_path in img_paths: name = libmag.get_filename_without_ext(img_path) labels_np = sitk_io.load_registered_img( img_path, config.RegNames.IMG_LABELS.value) # only use pos labels since assume neg labels are merely mirrored labels_unique = np.unique(labels_np[labels_np >= 0]) labels_dict[name] = pd.Series( np.ones(len(labels_unique), dtype=int), index=labels_unique) df = pd.DataFrame(labels_dict) df.sort_index() df.to_csv(output_path) print("common labels exported to {}".format(output_path)) return df
[docs] def make_density_image( img_path: str, scale: Optional[float] = None, shape: Optional[Sequence[int]] = None, suffix: Optional[str] = None, labels_img_sitk: Optional["sitk.Image"] = None, channel: Optional[Sequence[int]] = None, matches: Dict[Tuple[int, int], "colocalizer.BlobMatch"] = None, atlas_profile: Optional["atlas_prof.AtlasProfile"] = None, include: Sequence[int] = None ) -> Tuple[np.ndarray, str]: """Make a density image based on associated blobs. Uses the size and/or resolutions of the original image stored in the blobs if available to determine scaling between the blobs and the output image. Otherwise, attempts to load the original image or at least its metadata. The voxel sizes for the blobs is determined by giving an output image or shape. If ``matches`` is given, a heat map will be generated for each set of channels given in the dictionary. Otherwise, if the loaded blobs file has intensity-based colocalizations, a heat map will be generated for each combination of channels. Args: img_path: Path to image, which will be used to indentify the blobs file. scale: Scaling factor between the blobs' space and the output space; defaults to None to use the register. Scaling is found by :meth:`magmap.np_io.find_scaling`. shape: Output shape. Defaults to None, in which case the shape will match ``labels_img_sitk``. suffix: Modifier to append to end of ``img_path`` basename for registered image files that were output to a modified name; defaults to None. labels_img_sitk: Labels image. Defaults to None, in which case a registered labels image will be loaded. channel: Sequence of channels to include in density image. For multiple channels, blobs from all these channels are combined into one heatmap. Defaults to None to use all channels. matches: Dictionary of channel combinations to blob matches; defaults to None. atlas_profile: Atlas profile, used for scaling; defaults to None. include: Sequence of blob ``confirmed`` flags to include; defaults to None, in which case all flags will be included. Returns: Tuple of the density image as a Numpy array in the same shape as the opened image and the original and ``img_path`` to track such as for multiprocessing. """ def make_heat_map(): # build heat map to store densities per label px and save to file coord_scaled = ontology.scale_coords( blobs_chl[:, :3], scaling, labels_img.shape) _logger.debug("Scaled coords:\n%s", coord_scaled) return cv_nd.build_heat_map(labels_img.shape, coord_scaled) # set up paths and get labels image _logger.info("\n\nGenerating heat map from blobs") mod_path = img_path if suffix is not None: mod_path = libmag.insert_before_ext(img_path, suffix) # load blobs blobs = detector.Blobs().load_blobs(np_io.img_to_blobs_path(img_path)) # prepare output image is_2d = False if (shape is not None and blobs.roi_size is not None and blobs.resolutions is not None): # use target shape provided directly; extract image size stored in # blobs archive, assuming ROI size is full the full image scaling = np.divide(shape, blobs.roi_size) labels_spacing = np.divide(blobs.resolutions[0], scaling) labels_img = np.zeros(shape, dtype=np.uint8) labels_img_sitk = sitk_io.convert_img(labels_img) labels_img_sitk.SetSpacing(labels_spacing[::-1]) else: # default to use labels image as the size of the output image if labels_img_sitk is None: labels_img_sitk = sitk_io.load_registered_img( mod_path, config.RegNames.IMG_LABELS.value, get_sitk=True) labels_img = sitk_io.convert_img(labels_img_sitk) is_2d = labels_img.ndim == 2 labels_res = list(labels_img_sitk.GetSpacing())[::-1] if is_2d: # temporarily convert 2D images to 3D labels_img = labels_img[None] labels_res.insert(0, 1) if blobs.resolutions is not None: # find scaling based on blob to labels image resolution ratio scaling = np.divide(blobs.resolutions[0], labels_res) else: # find the scaling between the blobs and the labels image # TODO: remove target_size since it can be set by shape? target_size = ( None if atlas_profile is None else atlas_profile["target_size"]) scaling = np_io.find_scaling( img_path, labels_img.shape, scale, target_size)[0] if shape is not None: # scale blob coordinates and heat map to an alternative final shape scaling = np.divide(shape, np.divide(labels_img.shape, scaling)) labels_spacing = np.multiply( labels_res, np.divide(labels_img.shape, shape)) labels_img = np.zeros(shape, dtype=labels_img.dtype) labels_img_sitk.SetSpacing(labels_spacing[::-1]) _logger.debug("Using image scaling: {}".format(scaling)) blobs_chl = blobs.blobs _logger.info("Initial number of blobs: %s", len(blobs_chl)) if channel is not None: # filter blobs by channel _logger.info( "Using blobs from channel(s), combining if multiple channels: %s", channel) blobs_chl = blobs_chl[np.isin(detector.Blobs.get_blobs_channel( blobs_chl), channel)] _logger.info("Number of remaining blobs: %s", len(blobs_chl)) if include is not None: # filter blobs by confirmation flag _logger.info("Using blobs with confirmed flag(s): %s", include) blobs_chl = blobs_chl[np.isin(detector.Blobs.get_blob_confirmed( blobs_chl), include)] _logger.info("Number of remaining blobs: %s", len(blobs_chl)) # annotate blobs based on position heat_map = make_heat_map() if is_2d: # convert back to 2D heat_map = heat_map[0] imgs_write = { config.RegNames.IMG_HEAT_MAP.value: sitk_io.replace_sitk_with_numpy(labels_img_sitk, heat_map)} heat_colocs = None if matches: # create heat maps for match-based colocalization combos heat_colocs = [] for chl_combo, chl_matches in matches.items(): _logger.info( "Generating match-based colocalization heat map " "for channel combo: %s", chl_combo) # use blobs in first channel of each channel pair for simplicity blobs_chl = chl_matches.get_blobs(1) heat_colocs.append(make_heat_map()) elif blobs.colocalizations is not None: # create heat map for each intensity-based colocalization combo # as a separate channel in output image blob_chls = range(blobs.colocalizations.shape[1]) blob_chls_len = len(blob_chls) if blob_chls_len > 1: # get all channel combos that include given channels combos = [] chls = blob_chls if channel is None else channel for r in range(2, blob_chls_len + 1): combos.extend( [tuple(c) for c in itertools.combinations(blob_chls, r) if all([h in c for h in chls])]) heat_colocs = [] for combo in combos: _logger.info( "Generating intensity-based colocalization heat map " "for channel combo: %s", combo) blobs_chl = blobs.blobs[np.all(np.equal( blobs.colocalizations[:, combo], 1), axis=1)] heat_colocs.append(make_heat_map()) if heat_colocs is not None: # combine heat maps into single image heat_colocs = np.stack(heat_colocs, axis=3) if is_2d: # convert back to 2D heat_colocs = heat_colocs[0] imgs_write[config.RegNames.IMG_HEAT_COLOC.value] = \ sitk_io.replace_sitk_with_numpy(labels_img_sitk, heat_colocs, True) # write images to file sitk_io.write_reg_images(imgs_write, mod_path) return heat_map, img_path
[docs] def make_density_images_mp(img_paths, scale=None, shape=None, suffix=None, channel=None): """Make density images for a list of files as a multiprocessing wrapper for :func:``make_density_image`` Args: img_paths (List[str]): Sequence of image paths, which will be used to indentify the blob files. scale (int, float): Rescaling factor as a scalar value. If set, the corresponding image for this factor will be opened. If None, the full size image will be used. Defaults to None. shape (List[int]): Sequence of target shape defining the voxels for the density map; defaults to None. suffix (str): Modifier to append to end of ``img_path`` basename for registered image files that were output to a modified name; defaults to None. channel (List[int]): Sequence of channels to include in density image; defaults to None to use all channels. """ start_time = time() pool = chunking.get_mp_pool() pool_results = [] for img_path in img_paths: print("Making density image from blobs related to:", img_path) if config.channel: # get blob matches for the given channels if available; must load # from db outside of multiprocessing to avoid MemoryError matches = colocalizer.select_matches( config.db, config.channel, exp_name=sqlite.get_exp_name(img_path)) else: matches = None pool_results.append(pool.apply_async( make_density_image, args=(img_path, scale, shape, suffix, None, channel, matches, config.atlas_profile, config.classifier.include))) for result in pool_results: _, path = result.get() print("finished {}".format(path)) pool.close() pool.join() print("time elapsed for making density images:", time() - start_time)
[docs] def map_metric_to_labels_img( img_path: str, df_path: str, meas: str, fn_avg: Callable, prefix: Optional[str] = None, show: bool = False, level: Optional[int] = None, meas_path_name: Optional[str] = None, col_wt: Optional[str] = None): """Replace labels in an image with metric values. Can calculate the differences in metrics for each given region between two conditions or simply show a single set of metric values. Args: img_path: Path to the base image from which the corresponding registered image will be found. df_path: Path to data frame with metrics for the labels. meas: Name of colum in data frame with the chosen measurement. fn_avg: Function to apply to the set of measurements, such as a mean. Can be None if ``df_path`` points to a stats file from which to extract metrics directly in :meth:``vols.map_meas_to_labels``. prefix: Start of path for output image; defaults to None to use ``img_path`` instead. show: True to show the images after generating them. level: Ontological level at which to look up and show labels. Assume that labels level image corresponding to this value has already been generated by :meth:``make_labels_level_img``. Defaults to None to use only drawn labels. meas_path_name: Name to use in place of `meas` in output path. col_wt (str): Name of column to use for weighting. """ # load labels image and data frame before generating map for the # given metric of the chosen measurement _logger.info( "Generating labels difference image for %s from %s", meas, df_path) if level is None: # get registered labels image suffix from CLI arg reg_name = config.reg_suffixes[config.RegSuffixes.ANNOTATION] if not reg_name: reg_name = config.RegNames.IMG_LABELS.value else: # get suffix based on ontology level reg_name = config.RegNames.IMG_LABELS_LEVEL.value.format(level) # load registered labels image labels_sitk = sitk_io.load_registered_img(img_path, reg_name, get_sitk=True) labels_np = sitk_io.convert_img(labels_sitk) # load metrics and map to labels df = pd.read_csv(df_path) labels_diff = vols.map_meas_to_labels( labels_np, df, meas, fn_avg, col_wt=col_wt) if labels_diff is None: return labels_diff_sitk = sitk_io.replace_sitk_with_numpy(labels_sitk, labels_diff) # save and show labels difference image using measurement name in # output path or overriding with custom name meas_path = meas if meas_path_name is None else meas_path_name reg_diff = libmag.insert_before_ext( config.RegNames.IMG_LABELS_DIFF.value, meas_path, "_") if fn_avg is not None: # add function name to output path if given reg_diff = libmag.insert_before_ext( reg_diff, fn_avg.__name__, "_") imgs_write = {reg_diff: labels_diff_sitk} out_path = prefix if prefix else img_path sitk_io.write_reg_images(imgs_write, out_path) if show and sitk: for img in imgs_write.values(): if img: sitk.Show(img)
[docs] def make_labels_level_img( img_path: Optional[str], level: int, prefix: Optional[str] = None, show: bool = False ) -> Dict[str, "sitk.Image"]: """Replace labels in an image with their parents at the given level. Parents will be determined from the labels ontology file, which will be loaded from the image's labels metadata if available. This file can also be set in :config:`load_labels`. Labels that do not fall within a parent at that level will remain in place. Also generates edge images. Args: img_path: Path to the base image from which the corresponding registered image will be found. Can be None, where the globally set up image stored in :attr:`magmap.settings.config` will be used instead. If so, `prefix` must be given to specify the output path. level: Ontological level at which to group child labels. prefix: Start of path for output image; defaults to None to use ``img_path`` instead. show: True to show the images after generating them; defaults to False. Returns: Dictionary of registered image suffix to SimpleITK image. Raises: `ValueError` if `img_path` and `prefix` are both None. """ if img_path is None: if not prefix: raise ValueError("Must set 'prefix' if 'img_path' is None") # use the globally set up image stored in config labels_sitk = config.labels_img_sitk ref = config.labels_ref else: # load original labels image labels_sitk = sitk_io.load_registered_img( img_path, config.RegNames.IMG_LABELS.value, get_sitk=True) # set up ontology dict path_ref = config.load_labels if path_ref is None: # find ontology path from labels metadata lbls_meta = labels_meta.LabelsMeta(img_path).load() path_ref = lbls_meta.path_ref ref = ontology.LabelsRef(path_ref).load() # remap labels to given level labels_np = sitk_io.convert_img(labels_sitk) labels_np = ontology.make_labels_level(labels_np, ref, level) labels_level_sitk = sitk_io.replace_sitk_with_numpy(labels_sitk, labels_np) # generate an edge image at this level # TODO: consider moving into separate function labels_edge = vols.LabelToEdge.make_labels_edge(labels_np) labels_edge_sitk = sitk_io.replace_sitk_with_numpy(labels_sitk, labels_edge) # write and optionally display labels level image imgs_write = { config.RegNames.IMG_LABELS_LEVEL.value.format(level): labels_level_sitk, config.RegNames.IMG_LABELS_EDGE_LEVEL.value.format(level): labels_edge_sitk, } out_path = prefix if prefix else img_path sitk_io.write_reg_images(imgs_write, out_path) if show and sitk: for img in imgs_write.values(): if img: sitk.Show(img) return imgs_write