Source code for magmap.cv.colocalizer

# Object colocalization
# Copyright The MagellanMapper Contributors
"""Colocalize objects in an image, typically in separate channels."""

from enum import Enum
import multiprocessing as mp
from typing import Dict, List, Optional, Sequence, Tuple

import pandas as pd
import numpy as np
from skimage import morphology

from magmap.cv import chunking, detector, stack_detect, verifier
from magmap.io import cli, df_io, libmag, sqlite
from magmap.settings import config

_logger = config.logger.getChild(__name__)


[docs] class BlobMatch: """Blob match storage class as a wrapper for a data frame of matches. Attributes: df: Data frame of matches with column names given by :class:`BlobMatch.Cols`; defaults to None. coords: Blob match coordinates, typically the mean coordinates of each blob pair; defaults to None. cmap: Colormap for each match; defaults to None. """
[docs] class Cols(Enum): """Blob match column names.""" MATCH_ID = "MatchID" ROI_ID = "RoiID" BLOB1_ID = "Blob1ID" BLOB1 = "Blob1" BLOB2_ID = "Blob2ID" BLOB2 = "Blob2" DIST = "Distance"
def __init__(self, matches=None, match_id=None, roi_id=None, blob1_id=None, blob2_id=None, df=None): """Initialize blob match object. Args: matches (list[list[ :class:`numpy.ndarray`, :class:`numpy.ndarray`, float]]: List of blob match lists, which each contain, ``blob1, blob2, distance``. Defaults to None, which sets the data frame to None. match_id (Sequence[int]): Sequence of match IDs, which should be of the same length as ``matches``; defaults to None. roi_id (Sequence[int]): Sequence of ROI IDs, which should be of the same length as ``matches``; defaults to None. blob1_id (Sequence[int]): Sequence of blob 1 IDs, which should be of the same length as ``matches``; defaults to None. blob2_id (Sequence[int]): Sequence of blob2 IDs, which should be of the same length as ``matches``; defaults to None. df (:class:`pandas.DataFrame`): Pandas data frame to set in place of any other arguments; defaults to None. """ self.df: Optional[pd.DataFrame] = None self.coords: Optional[np.ndarray] = None self.cmap: Optional[np.ndarray] = None if df is not None: # set data frame directly and ignore any other arguments self.df = df return if matches is None: # return since any other arguments must correspond to matches return matches_dict = {} for i, match in enumerate(matches): # assumes that all first sequences are of the same length vals = { BlobMatch.Cols.BLOB1: match[0], BlobMatch.Cols.BLOB2: match[1], BlobMatch.Cols.DIST: match[2], } if match_id is not None: vals[BlobMatch.Cols.MATCH_ID] = match_id[i] if roi_id is not None: vals[BlobMatch.Cols.ROI_ID] = roi_id[i] if blob1_id is not None: vals[BlobMatch.Cols.BLOB1_ID] = blob1_id[i] if blob2_id is not None: vals[BlobMatch.Cols.BLOB2_ID] = blob2_id[i] for key in BlobMatch.Cols: matches_dict.setdefault(key, []).append( vals[key] if key in vals else None) self.df = df_io.dict_to_data_frame(matches_dict)
[docs] def __repr__(self): """Format the underlying data frame.""" if self.df is None: return "Empty blob matches" return df_io.print_data_frame(self.df, show=False, max_rows=10)
[docs] def get_blobs(self, n: int) -> Optional[np.ndarray]: """Get blobs as a numpy array. Args: n: 1 for blob1, otherwise blob 2. Returns: Numpy array of the given blob type, or None if the :attr:`df` is None or the blob column does not exist. """ col = BlobMatch.Cols.BLOB1 if n == 1 else BlobMatch.Cols.BLOB2 if self.df is None or col.value not in self.df: return None df = self.df[col.value] if len(df) == 0: return None return np.vstack(df)
[docs] def get_blobs_all(self) -> Optional[List[np.ndarray]]: """Get all blobs in the blob matches. Returns: Tuple of ``(blobs1, blobs2)``, or None if either are None. """ blobs_all = [] for n in (1, 2): blobs = self.get_blobs(n) if blobs is None: return None blobs_all.append(blobs) return blobs_all
[docs] def update_blobs(self, fn, *args): """Update all blobs with the given function. Args: fn (func): Function that accepts the output of :meth:`get_blobs` separately for each set of blobs. *args (Any): Additional arguments to ``fn``. """ if self.df is None: return for i, col in enumerate((BlobMatch.Cols.BLOB1, BlobMatch.Cols.BLOB2)): blobs = self.get_blobs(i + 1) if blobs is not None: self.df[col.value] = fn(blobs, *args).tolist()
[docs] def get_mean_coords(self): """Get mean value of each pair of matched blobs. Returns: ``[n, 3]`` array of ``n`` blob pairs, also set to :attr:`coords`. """ blobs = self.get_blobs_all() if any([b is None for b in blobs]): return None self.coords = np.mean([b[:, :3] for b in blobs], axis=0) return self.coords
[docs] class StackColocalizer(object): """Colocalize blobs in blocks based on matching blobs across channels. Support shared memory for spawned multiprocessing, with fallback to pickling in forked multiprocessing. """ blobs: Optional["detector.Blobs"] = None match_tol: Optional[Sequence[float]] = None #: Channels to match; defaults to None. channels: Optional[Sequence[int]] = None
[docs] @classmethod def colocalize_block( cls, coord: Sequence[int], offset: Sequence[int], shape: Sequence[int], blobs: Optional["detector.Blobs"] = None, tol: Optional[Sequence[float]] = None, setup_cli: bool = False, channels: Optional[Sequence[int]] = None ) -> Tuple[Sequence[int], Dict[Tuple[int, int], "BlobMatch"]]: """Colocalize blobs from different channels within a block. Args: coord: Block coordinate. offset: Block offset within the full image in z,y,x. shape: Block shape in z,y,x. blobs: Blobs; defaults to None, which will use :attr:`blobs`. tol: Tolerance for colocalizing blobs; defaults to None to use :attr:`match_tol`. setup_cli: True to set up CLI arguments, typically for a spawned (rather than forked) environment; defaults to False. channels: Channels to match; defaults to None, where :attr:`channels` will be used. Returns: Tuple of: - ``coord``: ``coord``, for tracking multiprocessing - ``matches``: dictionary of matches """ if blobs is None: blobs = cls.blobs if tol is None: tol = cls.match_tol if channels is None: channels = cls.channels if setup_cli: # reload command-line parameters cli.process_cli_args() _logger.debug( "Match-based colocalizing blobs in ROI at offset %s, size %s", offset, shape) matches = colocalize_blobs_match( blobs, offset[::-1], shape[::-1], tol, channels=channels) return coord, matches
[docs] @classmethod def colocalize_stack( cls, shape: Sequence[int], blobs: "detector.Blobs", channels: Optional[Sequence[int]] = None ) -> Dict[Tuple[int, int], "BlobMatch"]: """Entry point to colocalizing blobs within a stack. Args: shape: Image shape in z,y,x. blobs: Blobs. channels: Channels to match; defaults to None. Returns: Dictionary of matches, where keys are tuples of the channel pairs, and values are blob match objects. """ chls = ("each pair of channels" if channels is None else f"channels: {channels}") _logger.info( "Colocalizing blobs based on matching blobs in %s", chls) # scale match tolerance based on block processing ROI size blocks = stack_detect.setup_blocks(config.roi_profile, shape) match_tol = np.multiply( blocks.overlap_base, config.roi_profile["verify_tol_factor"]) # re-split stack with ROI size adjusted to the inner padding plus # the raw overlap size inner_pad = verifier.setup_match_blobs_roi(match_tol)[2] inner_pad = np.add(inner_pad, blocks.overlap_base) sub_roi_slices, sub_rois_offsets = chunking.stack_splitter( shape, blocks.max_pixels, inner_pad[::-1]) is_fork = chunking.is_fork() if is_fork: # set shared data in forked multiprocessing cls.blobs = blobs cls.match_tol = match_tol pool = mp.Pool(processes=config.cpus) pool_results = [] for z in range(sub_roi_slices.shape[0]): for y in range(sub_roi_slices.shape[1]): for x in range(sub_roi_slices.shape[2]): coord = (z, y, x) offset = sub_rois_offsets[coord] slices = sub_roi_slices[coord] shape = [s.stop - s.start for s in slices] if is_fork: # use variables stored as class attributes pool_results.append(pool.apply_async( StackColocalizer.colocalize_block, args=(coord, offset, shape))) else: # pickle full set of variables blobs_roi = detector.Blobs(detector.get_blobs_in_roi( blobs.blobs, offset, shape)[0]) pool_results.append(pool.apply_async( StackColocalizer.colocalize_block, args=(coord, offset, shape, blobs_roi, match_tol, True, channels))) # dict of channel combos to blob matches data frame matches_all = {} for result in pool_results: coord, matches = result.get() count = 0 for key, val in matches.items(): matches_all.setdefault(key, []).append(val.df) count += len(val.df) _logger.info( "Adding %s matches from block at %s of %s", count, coord, np.add(sub_roi_slices.shape, -1)) pool.close() pool.join() # prune duplicates by taking matches with shortest distance for key in matches_all.keys(): matches_all[key] = pd.concat(matches_all[key]) if matches_all[key].size > 0: for blobi in (BlobMatch.Cols.BLOB1, BlobMatch.Cols.BLOB2): # convert blob column to ndarray to extract coords by column matches = matches_all[key] matches_uniq, matches_i, matches_inv, matches_cts = np.unique( np.vstack(matches[blobi.value])[:, :3], axis=0, return_index=True, return_inverse=True, return_counts=True) if np.sum(matches_cts > 1) > 0: # prune if at least one blob has been matched to # multiple other blobs singles = matches.iloc[matches_i[matches_cts == 1]] dups = [] for i, ct in enumerate(matches_cts): # include non-duplicates to retain index if ct <= 1: continue # get indices in orig matches at given unique array # index and take match with lowest dist matches_mult = matches.loc[matches_inv == i] dists = matches_mult[BlobMatch.Cols.DIST.value] min_dist = np.amin(dists) num_matches = len(matches_mult) if num_matches > 1: _logger.debug( "Pruning from %s matches of dist: %s", num_matches, dists) matches_mult = matches_mult.loc[dists == min_dist] # take first in case of any ties dups.append(matches_mult.iloc[[0]]) matches_all[key] = pd.concat((singles, pd.concat(dups))) _logger.info( "Colocalization matches for channels %s: %s", key, len(matches_all[key])) _logger.debug( "Blob matches for %s after pruning:\n%s", key, matches_all[key]) # store data frame in BlobMatch object matches_all[key] = BlobMatch(df=matches_all[key]) return matches_all
[docs] def colocalize_blobs(roi, blobs, thresh=None): """Co-localize blobs from different channels based on surrounding intensities. Thresholds for detection are first identified in each channel by taking the blobs in the given channel, finding the surrounding intensities, and taking a low (5th) percentile. Then for each channel, the surrounding intensities of blobs in that channel are compared with the thresholds in the other channels. Blobs exceeding any given threshold are considered to co-localize in that channel. Args: roi (:obj:`np.ndarray`): Region of interest as a 3D+channel array. blobs (:obj:`np.ndarray`): Blobs as a 2D array in the format ``[[z, y, x, radius, confirmation, truth, channel...], ...]``. thresh (int, float, str): Threshold percentile of intensities from pixels surrounding each blob in the given channel. Use "min" to instead take the mininimum average intensity of all blobs in the channel. Defaults to None to use "min". Returns: :obj:`np.ndarray`: 2D Numpy array of same length as ``blobs`` with a column for each channel where 1 indicates that the corresponding blob has signal is present in the given channels at the blob's location, and 0 indicates insufficient signal. """ if blobs is None or roi is None or len(roi.shape) < 4: return None if thresh is None: thresh = "min" print("Colocalizing blobs based on image intensity across channels") threshs = [] selem = morphology.ball(2) # find only blobs in ROI since blobs list may include blobs from immediate # surrounds, but ROI is not available for them blobs_roi, blobs_roi_mask = detector.get_blobs_in_roi( blobs, (0, 0, 0), roi.shape[:3], reverse=False) blobs_chl = detector.Blobs.get_blobs_channel(blobs_roi) blobs_range_chls = [] # get labeled masks of blobs for each channel and threshold intensities mask_roi = np.ones(roi.shape[:3], dtype=int) mask_roi_chls = [] for chl in range(roi.shape[3]): # label a mask with blob indices surrounding each blob blobs_chl_mask = np.isin(blobs_chl, chl) blobs_range = np.where(blobs_chl_mask)[0] blobs_range_chls.append(blobs_range) mask = np.copy(mask_roi) * -1 mask[tuple(libmag.coords_for_indexing( blobs_roi[blobs_chl_mask, :3].astype(int)))] = blobs_range try: # Scikit-image >= v0.20 mask = morphology.dilation(mask, footprint=selem) except TypeError: # Scikit-image < v0.20 mask = morphology.dilation(mask, selem=selem) mask_roi_chls.append(mask) if thresh == "min": # set minimum average surrounding intensity of all blobs as thresh threshs.append( None if len(blobs_range) == 0 else np.amin([ np.mean(roi[mask == b, chl]) for b in blobs_range])) else: # set a percentile of intensities surrounding all blobs in channel # as threshold for that channel, or the whole ROI if no blobs mask_blobs = mask >= 0 roi_mask = roi if np.sum(mask_blobs) < 1 else roi[mask_blobs, chl] threshs.append(np.percentile(roi_mask, thresh)) channels = np.unique(detector.Blobs.get_blobs_channel(blobs_roi)).astype(int) colocs_roi = np.zeros((blobs_roi.shape[0], roi.shape[3]), dtype=np.uint8) for chl in channels: # get labeled mask of blobs in the given channel mask = mask_roi_chls[chl] blobs_range = blobs_range_chls[chl] for chl_other in channels: if threshs[chl_other] is None: continue for blobi in blobs_range: # find surrounding intensity of blob in another channel mask_blob = mask == blobi blob_avg = np.mean(roi[mask_blob, chl_other]) if config.verbose: print( blobi, detector.Blobs.get_blobs_channel( blobs_roi[blobi]), blobs_roi[blobi, :3], blob_avg, threshs[chl_other]) if blob_avg >= threshs[chl_other]: # intensities in another channel around blob's position # is above that channel's threshold colocs_roi[blobi, chl_other] = 1 # create array for all blobs including those outside ROI colocs = np.zeros((blobs.shape[0], roi.shape[3]), dtype=np.uint8) colocs[blobs_roi_mask] = colocs_roi if config.verbose: for i, (blob, coloc) in enumerate(zip(blobs_roi, colocs)): print(i, detector.Blobs.get_blobs_channel(blob), blob[:3], coloc) return colocs
[docs] def colocalize_blobs_match( blobs: "detector.Blobs", offset: Sequence[int], size: Sequence[int], tol: Sequence[float], inner_padding: Optional[Sequence[int]] = None, channels: Optional[Sequence[int]] = None ) -> Optional[Dict[Tuple[int, int], "BlobMatch"]]: """Co-localize blobs in separate channels but the same ROI by finding optimal blob matches. Args: blobs: Blobs. offset: ROI offset given as x,y,z. size: ROI shape given as x,y,z. tol: Tolerances for matching given as x,y,z inner_padding: ROI padding given as x,y,z; defaults to None to use the padding based on ``tol``. channels: Indices of channels to colocalize. Defaults to None, which will use all channels in ``blobs``. Returns: Dictionary where keys are tuples of the two channels compared and values are blob matches objects, or None if ``blobs`` is None. """ if blobs is None: return None thresh, scaling, inner_pad, resize, blobs_roi = \ verifier.setup_match_blobs_roi(tol, blobs) if inner_padding is None: inner_padding = inner_pad matches_chls = {} # get channels in blobs blob_chls = np.unique(blobs.get_blobs_channel(blobs_roi)).astype(int) if channels is not None: # filter blob channels to only include specified channels blob_chls = [c for c in blob_chls if c in channels] for chl in blob_chls: # pair channels blobs_chl = blobs.blobs_in_channel(blobs_roi, chl) for chl_other in blob_chls: # prevent duplicates by skipping other channels below given channel if chl >= chl_other: continue # find colocalizations between blobs from one channel to blobs # in another channel blobs_chl_other = blobs.blobs_in_channel(blobs_roi, chl_other) blobs_inner_plus, blobs_truth_inner_plus, offset_inner, \ size_inner, matches = verifier.match_blobs_roi( blobs_chl_other, blobs_chl, offset, size, thresh, scaling, inner_padding, resize) # reset truth and confirmation blob flags in matches chl_combo = (chl, chl_other) matches.update_blobs(blobs.set_blob_truth, -1) matches.update_blobs(blobs.set_blob_confirmed, -1) matches_chls[chl_combo] = matches return matches_chls
def _get_roi_id( db: "sqlite.ClrDB", offset: Sequence[int], shape: Sequence[int], exp_name: Optional[str] = None) -> int: """Get database ROI ID for the given ROI position within the main image5d. Args: db: Database object. offset: ROI offset in z,y,x. shape: ROI shape in z,y,x. exp_name: Name of experiment; defaults to None to attempt discovery through any image loaded to :attr:`config.img5d`. Returns: int: ROI ID or found or inserted ROI. """ if exp_name is None: exp_name = sqlite.get_exp_name( config.img5d.path_img if config.img5d else None) exp_id = db.select_or_insert_experiment(exp_name, None) roi_id = sqlite.select_or_insert_roi( db.conn, db.cur, exp_id, config.series, offset, shape)[0] return roi_id
[docs] def insert_matches(db, matches): """Insert matches into database for a whole image. Args: db (:obj:`sqlite.ClrDB`): Database object. matches (dict[tuple[int, int], :class:`BlobMatch`): Dictionary of channel combo tuples to blob match objects. """ # use size of 0 for each dimension for whole-image ROI, which avoids # the need to discover the image size roi_id = _get_roi_id(db, (0, 0, 0), (0, 0, 0)) for chl_matches in matches.values(): # insert blobs and matches for the given channel combo blobs = chl_matches.get_blobs_all() if blobs is not None: sqlite.insert_blobs(db.conn, db.cur, roi_id, np.vstack(blobs)) config.db.insert_blob_matches(roi_id, chl_matches)
[docs] def select_matches(db, channels, offset=None, shape=None, exp_name=None): """Select blob matches for the given region from a database. Blob matches are assumed to have been processed from the whole image. To retrieve matches from a selected ROI, use :meth:`magmap.io.sqlite.ClrDB.select_blob_matches` instead. Args: db (:obj:`sqlite.ClrDB`): Database object. channels (list[int]): List of channels. offset (list[int]): ROI offset in z,y,x; defaults to None to use ``(0, 0, 0)``. shape (list[int]): ROI shape in z,y,x; defaults to None to use ``(0, 0, 0)``. exp_name (str): Name of experiment in ``db``. Returns: dict[tuple[int, int], list[:obj:`BlobMatch`]: Dictionary where keys are tuples of the two channels compared and values are a list of blob matches. None if no blob matches are found. """ # get ROI for whole image, where whole-image matches are stored roi_id = _get_roi_id(db, (0, 0, 0), (0, 0, 0), exp_name) blob_matches = db.select_blob_matches(roi_id, offset, shape) if blob_matches.df is None or len(blob_matches.df) == 0: _logger.info("No blob matches found") return None matches = {} for chl in channels: # pair channels for chl_other in channels: if chl >= chl_other: continue # select matches for blobs in the first channel of the channel pair, # assuming chls were paired this way during insertion blobs1 = blob_matches.get_blobs(1) if blobs1 is None: continue chl_matches = BlobMatch(df=blob_matches.df.loc[ detector.Blobs.get_blobs_channel(blobs1) == chl]) # also extract only blob1's paired to blob2's in the 2nd channel blobs2 = chl_matches.get_blobs(2) if blobs2 is None: continue chl_matches.df = chl_matches.df.loc[ detector.Blobs.get_blobs_channel(blobs2) == chl_other] # store the matches with chl1-chl2 as key matches[(chl, chl_other)] = chl_matches return matches