Source code for magmap.stats.clustering

# Cluster measurement.
# Author: David Young, 2019
"""Clustering measurements."""

import os

import numpy as np
import pandas as pd
try:
    from sklearn import cluster
    from sklearn import neighbors
except ImportError:
    cluster = None
    neighbors = None

from magmap.cv import chunking, detector
from magmap.settings import config
from magmap.io import libmag
from magmap.io import np_io
from magmap.atlas import ontology
from magmap.plot import plot_2d
from magmap.settings import profiles
from magmap.io import sitk_io
from magmap.io import df_io


[docs] def knn_dist(blobs, n, max_dist=None, max_pts=None, show=True): """Measure the k-nearest-neighbors distance. Args: blobs (:obj:`np.ndarray`): Sequence given as ``[n_samples, n_features]``, where features typically is of the form, ``[z, y, x, ...]``. n (int): Number of neighbors. The farthest neighbor will be used for sorting, filtering, and plotting. max_dist (float): Cap the maximum distance of points to plot, given as factor of the median distance; defaults to None to show neighbors of all distances. max_pts (int): Cap the maximum number of points for the zoomed plot if the 90th percentile exceeds this number; defaults to None. show (bool): True to immediately show the plot the distances; defaults to True. Will still plot and save in the background if :attr:`config.savefig` is set. Returns: :obj:`neighbors.NearestNeighbors`, :obj:`np.ndarray`, List[:obj:`pd.DataFrame`]: Tuple of ``NearestNeighbors`` object, all distances from ``kneighbors`` sorted by the ``n``th neighbor, and a list of data frames at different zoom levels (``[df_overview, df_zoomed]``). """ def plot(mod=""): # plot sorted distances as line and return data frame df = pd.DataFrame( {"point": np.arange(len(dist_disp)), "dist": dist_disp}) plot_2d.plot_lines( "knn_dist{}".format(mod), "point", ("dist", ), df=df, show=show, title=config.plot_labels[config.PlotLabels.TITLE]) return df if not neighbors: raise ImportError( config.format_import_err("scikit-learn", task="kNN distances")) #blobs = blobs[::int(len(blobs) / 1000)] # TESTING: small num of blobs knn = neighbors.NearestNeighbors(n, n_jobs=-1).fit(blobs) print(knn) dist, _ = knn.kneighbors(blobs) # sort distances based on nth neighbors dist = dist[np.argsort(dist[:, n - 1])] dfs = [] if show or config.savefig: distn = dist[:, n - 1] if max_dist: # remove all distances where nth neighbor is beyond threshold distn = distn[distn < max_dist * np.median(distn)] len_distn = len(distn) # line plot of nth neighbor distances by ascending order, # downsampling for overview plot step = int(len_distn / 1000) if step < 1: step = 1 dist_disp = distn[::step] dfs.append(plot()) # zoom to >= 90th percentile or max points, whichever is smaller above_pct = distn > np.percentile(distn, 90) if max_pts and max_pts < np.sum(above_pct): print("limiting zoomed plot to last {} points".format(max_pts)) dist_disp = distn[len_distn-max_pts:] else: dist_disp = distn[above_pct] dfs.append(plot("_zoomed")) return knn, dist, dfs
[docs] def plot_knns(img_paths, suffix=None, show=False, names=None): """Plot k-nearest-neighbor distances for multiple sets of blobs, overlaying on a single plot. Args: img_paths (List[str]): Base paths from which registered labels and blobs files will be found and output blobs file save location will be constructed. suffix (str): Suffix for ``path``; defaults to None. show (bool): True to plot the distances; defaults to False. names (List[str]): Sequence of names corresponding to ``img_paths`` for the plot legend. """ cluster_settings = config.atlas_profile[ profiles.RegKeys.METRICS_CLUSTER] knn_n = cluster_settings[profiles.RegKeys.KNN_N] if not knn_n: knn_n = cluster_settings[profiles.RegKeys.DBSCAN_MINPTS] - 1 print("Calculating k-nearest-neighbor distances and plotting distances " "for neighbor {}".format(knn_n)) # set up combined data frames for all samples at each zoom level df_keys = ("ov", "zoom") dfs_comb = {key: [] for key in df_keys} names_disp = names if names else [] for i, img_path in enumerate(img_paths): # load blobs associated with image mod_path = img_path if suffix is not None: mod_path = libmag.insert_before_ext(img_path, suffix) labels_img_np = sitk_io.load_registered_img( mod_path, config.RegNames.IMG_LABELS.value) blobs = detector.Blobs().load_blobs(np_io.img_to_blobs_path(img_path)) scaling, res = np_io.find_scaling( img_path, labels_img_np.shape, load_size=config.atlas_profile["target_size"]) if blobs is None: libmag.warn("unable to load nuclei coordinates for", img_path) continue # convert to physical units and display k-nearest-neighbors for nuclei blobs_phys = np.multiply(blobs.blobs[:, :3], res) # TESTING: given the same blobs, simply shift #blobs = np.multiply(blobs[i*10000000:, :3], res) _, _, dfs = knn_dist(blobs_phys, knn_n, 2, 1000000, False) if names is None: # default to naming from filename names_disp.append(os.path.basename(mod_path)) for j, df in enumerate(dfs): dfs_comb[df_keys[j]].append(df) for key in dfs_comb: # combine data frames at each zoom level, save, and plot with # different colors for each image df = df_io.join_dfs(dfs_comb[key], "point") dist_cols = [col for col in df.columns if col.startswith("dist")] rename_cols = {col: name for col, name in zip(dist_cols, names_disp)} df = df.rename(rename_cols, axis=1) out_path = "knn_dist_combine_{}".format(key) df_io.data_frames_to_csv(df, out_path) plot_2d.plot_lines( out_path, "point", rename_cols.values(), df=df, show=show, title=config.plot_labels[config.PlotLabels.TITLE])
[docs] def cluster_dbscan_metrics(labels): """Calculate basic metrics for DBSCAN. Args: labels (:obj:`np.ndarray`): Cluster labels. Returns: int, int, int: Tuple of number of clusters, number of noise blobs, and number of blobs contained within the largest cluster. """ lbl_unique, lbl_counts = np.unique( labels[labels != -1], return_counts=True) num_clusters = len(lbl_unique) # number of blobs in largest cluster num_largest = np.nan if len(lbl_counts) == 0 else np.amax(lbl_counts) # number of blobs not in a cluster num_noise = np.sum(labels == -1) return num_clusters, num_noise, num_largest
[docs] class ClusterByLabel(object): blobs = None
[docs] @classmethod def cluster_by_label(cls, blobs, labels_img_np, blobs_lbl_scaling, blobs_iso_scaling, all_labels=False): coord_scaled = ontology.scale_coords(blobs, blobs_lbl_scaling) blobs_lbls = ontology.get_label_ids_from_position( coord_scaled, labels_img_np) blobs = np.multiply(blobs[:, :3], blobs_iso_scaling) blobs_clus = np.zeros((len(blobs), 5), dtype=int) blobs_clus[:, :3] = blobs blobs_clus[:, 3] = blobs_lbls cls.blobs = blobs_clus print(np.unique(blobs_clus[:, 3])) print(cls.blobs) # TODO: shift to separate func once load blobs without req labels img label_ids = np.unique(labels_img_np) cluster_settings = config.atlas_profile[ profiles.RegKeys.METRICS_CLUSTER] eps = cluster_settings[profiles.RegKeys.DBSCAN_EPS] minpts = cluster_settings[profiles.RegKeys.DBSCAN_MINPTS] if all_labels: # cluster all labels together # TODO: n_jobs appears to be ignored despite reported fixes _, labels = cls.cluster_within_label(None, eps, minpts, -1) cls.blobs[:, 4] = labels else: # cluster by individual label pool = chunking.get_mp_pool() pool_results = [] for label_id in label_ids: # add rotation argument if necessary pool_results.append( pool.apply_async( cls.cluster_within_label, args=(label_id, eps, minpts, None))) for result in pool_results: label_id, labels = result.get() if labels is not None: cls.blobs[cls.blobs[:, 3] == label_id, 4] = labels pool.close() pool.join() cls.blobs[:, :3] = np.divide(blobs[:, :3], blobs_iso_scaling) return cls.blobs
[docs] @classmethod def cluster_within_label(cls, label_id, eps, minpts, n_jobs): blobs = cls.blobs if label_id is not None: blobs = blobs[blobs[:, 3] == label_id] clus_lbls = None if len(blobs) > 0: clusters = cluster.DBSCAN( eps=eps, min_samples=minpts, leaf_size=30, n_jobs=n_jobs).fit(blobs) num_clusters, num_noise, num_largest = cluster_dbscan_metrics( clusters.labels_) print("label {}: num clusters: {}, noise blobs: {}, " "largest cluster: {}" .format(label_id, num_clusters, num_noise, num_largest)) clus_lbls = clusters.labels_ return label_id, clus_lbls
[docs] def cluster_blobs(img_path, suffix=None): """Cluster blobs and save to Numpy archive. Args: img_path (str): Base path from which registered labels and blobs files will be found and output blobs file save location will be constructed. suffix (str): Suffix for ``path``; defaults to None. Returns: """ if not cluster: raise ImportError( config.format_import_err("scikit-learn", task="blob clustering")) mod_path = img_path if suffix is not None: mod_path = libmag.insert_before_ext(img_path, suffix) labels_img_np = sitk_io.load_registered_img( mod_path, config.RegNames.IMG_LABELS.value) blobs = detector.Blobs().load_blobs(np_io.img_to_blobs_path(img_path)) scaling, res = np_io.find_scaling( img_path, labels_img_np.shape, load_size=config.atlas_profile["target_size"]) if blobs is None: libmag.warn("unable to load nuclei coordinates") return # append label IDs to blobs and scale to make isotropic blobs_clus = ClusterByLabel.cluster_by_label( blobs.blobs[:, :3], labels_img_np, scaling, res) print(blobs_clus) out_path = libmag.combine_paths(mod_path, config.SUFFIX_BLOB_CLUSTERS) np.save(out_path, blobs_clus)