Source code for magmap.io.cli

#!/usr/bin/env python
# Command line parsing and setup
# Author: David Young, 2017, 2020
"""Command line parser and and environment setup for MagellanMapper.

This module can be run either as a script to work in headless mode or 
loaded and initialized by calling main(). 

Note on dimensions order: User-defined dimension 
variables are generally given in (x, y, z) order as per normal
convention, but otherwise dimensions are generally in (z, y, x) for
consistency with microscopy order and ease of processing stacks by z.

Examples:
    Launch in headless mode with the given file at a particular size and 
    offset::
        
        $ python -m magmap.cli --img /path/to/file.czi --offset 30,50,205 \\
            --size 150,150,10

For a table of command-line arguments and their usage, see:
https://github.com/sanderslab/magellanmapper/blob/master/docs/cli.md

"""

import argparse
import dataclasses
import pprint
from enum import Enum
import logging
import os
import sys
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, \
    Union

import numpy as np

from magmap.atlas import register, transformer
from magmap.cloud import notify
from magmap.cv import chunking, classifier, colocalizer, stack_detect
from magmap.io import df_io, export_stack, importer, libmag, naming, np_io, \
    sqlite
from magmap.plot import colormaps, plot_2d
from magmap.settings import atlas_prof, config, grid_search_prof, logs, \
    prefs_prof, roi_prof
from magmap.stats import mlearn

_logger = config.logger.getChild(__name__)


def _parse_coords(arg: str, rev: bool = False) -> List[Tuple[int, ...]]:
    # parse a list of strings into 3D coordinates
    coords = []  # copy list to avoid altering the arg itself
    for coord in arg:
        if not coord: continue
        coord_split = coord.split(",")
        if len(coord_split) >= 3:
            coord = tuple(int(i) for i in coord_split)
            if rev:
                coord = coord[::-1]
            coords.append(coord)
        else:
            print("Coordinates ({}) should be given as 3 values (x, y, z)"
                  .format(coord))
    return coords


def _parse_none(
        arg: Any, fn: Optional[Callable] = None,
        none: Sequence[str] = ("none", "0")) -> Any:
    """Parse arguments with support for conversion to None.
    
    Args:
        arg: Argument to potentially convert.
        fn: Function to apply to ``arg`` if not converted; defaults to None.
        none: Sequence of strings to convert to None.

    Returns:
        None if ``arg`` is "none" or "0"; otherwise, returns ``fn(arg)`` if
        ``fn`` is given, or ``arg`` unchanged.

    """
    if not libmag.is_seq(none):
        none = [none]
    if arg.lower() in none:
        return None
    return arg if fn is None else fn(arg)


def _is_arg_true(arg: str) -> bool:
    return arg.lower() == "true" or arg == "1"


[docs] def args_with_dict(args: List[str]) -> List[Union[Dict[str, Any], str, int]]: """Parse arguments list with optional arguments given as dictionary-like elements. Args: args: List of arguments, which can be single values or "=" delimited values. Single values will be stored in the same order, while delimited entries will be entered sequentially into a dictionary. Entries can also be comma-delimited to specify lists. Returns: List of arguments ordered first with single-value entries in the same order in which they were entered, followed by a dictionary with all equals-delimited entries, also in the same order as entered. Entries that contain commas will be split into comma-delimited lists. All values will be converted to ints if possible. """ parsed = [] args_dict = {} for arg in args: arg_split = arg.split("=") for_dict = len(arg_split) > 1 vals = arg_split[1] if for_dict else arg vals_split = vals.split(",") if len(vals_split) > 1: vals = vals_split vals = libmag.get_int(vals) if for_dict: args_dict[arg_split[0]] = vals else: parsed.append(vals) parsed.append(args_dict) return parsed
[docs] def args_to_dict( args: List[str], keys: Union[Type[Enum], Type["config.DataClass"]], args_dict: Optional[Dict[Enum, Any]] = None, sep_args: str = "=", sep_vals: str = ",", default: Optional[Any] = None ) -> Union[Dict[Enum, Any], Type["config.DataClass"]]: """Parse positional and keyword-based arguments to an enum-keyed dictionary. Args: args: List of arguments with positional values followed by ``sep_args``-delimited values. Positional values will be entered in the existing order of ``keys`` based on member values, while keyword-based values will be entered if a member corresponding to the keyword exists. Entries can also be ``sep_vals``-delimited to specify lists. keys: Enum class or data class instance whose fields will be used as keys for dictionary. Enum values are assumed to range from 1 to number of members as output by the default Enum functional API. args_dict: Dictionary to be filled or updated with keys from ``keys_enum``. Defaults to None, which will assign an empty dict. Ignored if ``keys_class`` is a data class instance, which will be updated instead. sep_args: Separator between arguments and values; defaults to "=". sep_vals: Separator within values; defaults to ",". default: Default value for each argument. Effectively turns off positional argument assignments since all args become ``<keyword>=<default>``. Defaults to None. If a str, will undergo splitting by ``sep_vals``. Returns: Dictionary or data class corresponding to ``keys``, filled with arguments. Values that contain commas will be split into comma-delimited lists. All values will be converted to ints if possible. """ is_data = dataclasses.is_dataclass(keys) if is_data: # use fields as keys keys_data = [f.name for f in dataclasses.fields(keys)] nkeys = len(keys_data) out = keys else: # use Enum members as keys keys_data = None nkeys = len(keys) out = {} if args_dict is None else args_dict by_position = True for i, arg in enumerate(args): arg_split = arg.split(sep_args) if default and len(arg_split) < 2: # add default value unless another value is given arg_split.append(default) len_arg_split = len(arg_split) # assume by position until any keyword given by_position = by_position and len_arg_split < 2 key = None vals = arg if by_position: # positions are based on enum vals, assumed to range from # 1 to num of members n = i + 1 if n > nkeys: _logger.warn( "No further parameters in '%s' to assign '%s' by " "position, skipping", keys, arg) continue key = keys_data[n] if is_data else keys(n) elif len_arg_split < 2: _logger.warn( "Parameter '%s' does not contain a keyword, skipping", arg) else: # assign based on keyword if its equivalent enum exists key_str = arg_split[0] vals = arg_split[1] try: key = key_str.lower() if is_data else keys[key_str.upper()] except KeyError: _logger.warn( "Unable to find '%s' in %s, skipping", key_str, keys) continue if key: if isinstance(vals, str): # split delimited strings vals_split = vals.split(sep_vals) if len(vals_split) > 1: vals = vals_split # cast to numeric type if possible vals = libmag.get_int(vals) # assign to found enum to data class if is_data: setattr(out, key, vals) else: out[key] = vals return out
def _get_args_dict_help( msg: str, keys: Union[Type[Enum], Type["config.DataClass"]]) -> str: """Get the help message for command-line arguments that are converted to a dictionary. Args: msg: Message to prepend to the help message. keys: Keys as an Enumeration. Returns: Help message with available keys. """ if dataclasses.is_dataclass(keys): # get all fields from data class names = [f.name for f in dataclasses.fields(keys)] else: # use enum names names = libmag.enum_names_aslist(keys) return (f"{msg} Available keys, which follow this order positionally " f"until the first key=value pair is given: {names}")
[docs] def process_cli_args(): """Parse command-line arguments. Typically stores values as :mod:`magmap.settings.config` attributes. """ parser = argparse.ArgumentParser( description="Setup environment for MagellanMapper") parser.add_argument( "--version", action="store_true", help="Show version information and exit") # image specification arguments # image path(s) specified as an optional argument; takes precedence # over positional argument parser.add_argument( "--img", nargs="*", default=None, help="Main image path(s); after import, the filename is often " "given as the original name without its extension") # alternatively specified as the first and only positional parameter # with as many arguments as desired parser.add_argument( "img_paths", nargs="*", default=None, help="Main image path(s); can also be given as --img, which takes " "precedence over this argument") parser.add_argument( "--meta", nargs="*", help="Metadata path(s), which can be given as multiple files " "corresponding to each image") parser.add_argument( "--prefix", nargs="*", type=str, help="Path prefix(es), typically used as the base path for file output") parser.add_argument( "--prefix_out", nargs="*", type=str, help="Path prefix(es), typically used as the base path for file output " "when --prefix modifies the input path") parser.add_argument( "--suffix", nargs="*", type=str, help="Path suffix(es), typically inserted just before the extension") parser.add_argument("--channel", nargs="*", type=int, help="Channel index") parser.add_argument("--series", help="Series index") parser.add_argument( "--subimg_offset", nargs="*", help="Sub-image offset in x,y,z") parser.add_argument( "--subimg_size", nargs="*", help="Sub-image size in x,y,z") parser.add_argument("--offset", nargs="*", help="ROI offset in x,y,z") parser.add_argument("--size", nargs="*", help="ROI size in x,y,z") parser.add_argument("--db", help="Database path") parser.add_argument( "--cpus", help="Maximum number of CPUs/processes to use for multiprocessing " "tasks. Use \"none\" or 0 to auto-detect this number (default).") parser.add_argument( "--load", nargs="*", help="Load associated data files; see config.LoadData for settings") # task arguments parser.add_argument( "--proc", nargs="*", help=_get_args_dict_help( "Image processing mode; see config.ProcessTypes for keys " "and config.PreProcessKeys for PREPROCESS values", config.ProcessTypes)) parser.add_argument( "--register", type=str.lower, choices=libmag.enum_names_aslist(config.RegisterTypes), help="Image registration task") parser.add_argument( "--df", type=str.lower, choices=libmag.enum_names_aslist(config.DFTasks), help="Data frame task") parser.add_argument( "--plot_2d", type=str.lower, choices=libmag.enum_names_aslist(config.Plot2DTypes), help="2D plot task; see config.Plot2DTypes") parser.add_argument("--ec2_start", nargs="*", help="AWS EC2 instance start") parser.add_argument("--ec2_list", nargs="*", help="AWS EC2 instance list") parser.add_argument( "--ec2_terminate", nargs="*", help="AWS EC2 instance termination") parser.add_argument( "--notify", nargs="*", help="Notification message URL, message, and attachment strings") # profile arguments parser.add_argument( "--roi_profile", nargs="*", help="ROI profile, which can be separated by underscores " "for multiple profiles and given as paths to custom profiles " "in YAML format. Multiple profile groups can be given, which " "will each be applied to the corresponding channel. See " "docs/settings.md for more details.") parser.add_argument( "--atlas_profile", help="Atlas profile, which can be separated by underscores " "for multiple profiles and given as paths to custom profiles " "in YAML format. See docs/settings.md for more details.") parser.add_argument( "--grid_search", help="Grid search hyperparameter tuning profile(s), which can be " "separated by underscores for multiple profiles and given as " "paths to custom profiles in YAML format. See docs/settings.md " "for more details.") parser.add_argument( "--theme", nargs="*", type=str.lower, choices=libmag.enum_names_aslist(config.Themes), help="UI theme, which can be given as multiple themes to apply " "on top of one another") # grouped arguments parser.add_argument( "--truth_db", nargs="*", help="Truth database; see config.TruthDB for settings and " "config.TruthDBModes for modes") parser.add_argument( "--labels", nargs="*", help=_get_args_dict_help( "Atlas labels; see config.AtlasLabels.", config.AtlasLabels)) parser.add_argument( "--transform", nargs="*", help=_get_args_dict_help( "Image transformations; see config.Transforms.", config.Transforms)) parser.add_argument( "--reg_suffixes", nargs="*", help=_get_args_dict_help( "Registered image suffixes; see config.RegSuffixes for keys " "and config.RegNames for values", config.RegSuffixes)) parser.add_argument( "--plot_labels", nargs="*", help=_get_args_dict_help( "Plot label customizations; see config.PlotLabels ", config.PlotLabels)) parser.add_argument( "--set_meta", nargs="*", help="Set metadata values; see config.MetaKeys for settings") parser.add_argument( "--classifier", nargs="*", help=_get_args_dict_help( "Classifier values; see config.ClassifierKeys for settings.", config.ClassifierData)) # image and figure display arguments parser.add_argument( "--plane", type=str.lower, choices=config.PLANE, help="Planar orientation") parser.add_argument( "--show", nargs="?", const="1", help="If applicable, show images after completing the given task") parser.add_argument( "--alphas", help="Alpha opacity levels, which can be comma-delimited for " "multichannel images") parser.add_argument( "--vmin", help="Minimum intensity levels, which can be comma-delimited " "for multichannel images") parser.add_argument( "--vmax", help="Maximum intensity levels, which can be comma-delimited " "for multichannel images") parser.add_argument( "--rgb", action="store_true", help="Open images as RGB(A) color images") parser.add_argument("--seed", help="Random number generator seed") # export arguments parser.add_argument( "--save_subimg", action="store_true", help="Save sub-image as separate file") parser.add_argument("--slice", help="Slice given as start,stop,step") parser.add_argument("--delay", help="Animation delay in ms") parser.add_argument( "--savefig", help="Extension for saved figures") parser.add_argument( "--groups", nargs="*", help="Group values corresponding to each image") parser.add_argument( "-v", "--verbose", nargs="*", help=_get_args_dict_help( "Verbose output to assist with debugging; see config.Verbosity.", config.Verbosity)) # only parse recognized arguments to avoid error for unrecognized ones args, args_unknown = parser.parse_known_args() # set up application directories user_dir = config.user_app_dirs.user_data_dir if not os.path.isdir(user_dir): # make application data directory if os.path.exists(user_dir): # backup any non-directory file libmag.backup_file(user_dir) os.makedirs(user_dir) if args.verbose is not None: # verbose mode and logging setup config.verbose = True config.verbosity = args_to_dict( args.verbose, config.Verbosity, config.verbosity) if config.verbosity[config.Verbosity.LEVEL] is None: # default to debug mode if any verbose flag is set without level config.verbosity[config.Verbosity.LEVEL] = logging.DEBUG logs.update_log_level( config.logger, config.verbosity[config.Verbosity.LEVEL]) # print longer Numpy arrays for debugging np.set_printoptions(linewidth=200, threshold=10000) _logger.info("Set verbose to %s", config.verbosity) # set up logging to given file unless explicitly given an empty string log_path = config.verbosity[config.Verbosity.LOG_PATH] if log_path != "": if log_path is None: log_path = os.path.join( config.user_app_dirs.user_data_dir, "out.log") # log to file config.log_path = logs.add_file_handler(config.logger, log_path) # redirect standard out/error to logging sys.stdout = logs.LogWriter(config.logger.info) sys.stderr = logs.LogWriter(config.logger.error) # load preferences file config.prefs = prefs_prof.PrefsProfile() config.prefs.add_profiles(str(config.PREFS_PATH)) if args.version: # print version info and exit _logger.info(f"{config.APP_NAME}-{libmag.get_version(True)}") shutdown() # log the app launch path path_launch = (sys._MEIPASS if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS") else sys.argv[0]) _logger.info(f"Launched MagellanMapper from {path_launch}") if args.img is not None or args.img_paths: # set image file path and convert to basis for additional paths config.filenames = args.img if args.img else args.img_paths config.filename = config.filenames[0] print("Set filenames to {}, current filename {}" .format(config.filenames, config.filename)) if args.meta is not None: # set metadata paths config.metadata_paths = args.meta print("Set metadata paths to", config.metadata_paths) config.metadatas = [] for path in config.metadata_paths: # load metadata to dictionary md, _ = importer.load_metadata(path, assign=False) config.metadatas.append(md) if args.channel is not None: # set the channels config.channel = args.channel print("Set channel to {}".format(config.channel)) config.series_list = [config.series] # list of series if args.series is not None: series_split = args.series.split(",") config.series_list = [] for ser in series_split: ser_split = ser.split("-") if len(ser_split) > 1: ser_range = np.arange(int(ser_split[0]), int(ser_split[1]) + 1) config.series_list.extend(ser_range.tolist()) else: config.series_list.append(int(ser_split[0])) config.series = config.series_list[0] print("Set to series_list to {}, current series {}".format( config.series_list, config.series)) if args.savefig is not None: # save figure with file type of this extension; remove leading period config.savefig = _parse_none(args.savefig.lstrip(".")) print("Set savefig extension to {}".format(config.savefig)) # parse sub-image offsets and sizes; # expects x,y,z input but stores as z,y,x by convention if args.subimg_offset is not None: config.subimg_offsets = _parse_coords(args.subimg_offset, True) print("Set sub-image offsets to {} (z,y,x)" .format(config.subimg_offsets)) if args.subimg_size is not None: config.subimg_sizes = _parse_coords(args.subimg_size, True) print("Set sub-image sizes to {} (z,y,x)" .format(config.subimg_sizes)) # parse ROI offsets and sizes, which are relative to any sub-image; # expects x,y,z input and output if args.offset is not None: config.roi_offsets = _parse_coords(args.offset) if config.roi_offsets: config.roi_offset = config.roi_offsets[0] print("Set ROI offsets to {}, current offset {} (x,y,z)" .format(config.roi_offsets, config.roi_offset)) if args.size is not None: config.roi_sizes = _parse_coords(args.size) if config.roi_sizes: config.roi_size = config.roi_sizes[0] print("Set ROI sizes to {}, current size {} (x,y,z)" .format(config.roi_sizes, config.roi_size)) if args.cpus is not None: # set maximum number of CPUs config.cpus = _parse_none(args.cpus.lower(), int) print("Set maximum number of CPUs for multiprocessing tasks to", config.cpus) if args.load is not None: # flag loading data sources with default sub-arg indicating that the # data should be loaded from a default path; otherwise, load from # path given by the sub-arg; change delimiter to allow paths with "," config.load_data = args_to_dict( args.load, config.LoadData, config.load_data, sep_vals="|", default=True) print("Set to load the data types: {}".format(config.load_data)) # set up main processing mode if args.proc is not None: config.proc_type = args_to_dict( args.proc, config.ProcessTypes, config.proc_type, default=True) print("Set main processing tasks to:", config.proc_type) if args.set_meta is not None: # set individual metadata values, currently used for image import # TODO: take precedence over loaded metadata archives config.meta_dict = args_to_dict( args.set_meta, config.MetaKeys, config.meta_dict, sep_vals="|") print("Set metadata values to {}".format(config.meta_dict)) res = config.meta_dict[config.MetaKeys.RESOLUTIONS] if res: # set image resolutions, taken as a single set of x,y,z and # converting to a nested list of z,y,x res_split = res.split(",") if len(res_split) >= 3: res_float = tuple(float(i) for i in res_split)[::-1] config.resolutions = [res_float] print("Set resolutions to {}".format(config.resolutions)) else: res_float = None print("Resolution ({}) should be given as 3 values (x,y,z)" .format(res)) # store single set of resolutions, similar to input config.meta_dict[config.MetaKeys.RESOLUTIONS] = res_float mag = config.meta_dict[config.MetaKeys.MAGNIFICATION] if mag: # set objective magnification config.magnification = mag print("Set magnification to {}".format(config.magnification)) zoom = config.meta_dict[config.MetaKeys.ZOOM] if zoom: # set objective zoom config.zoom = zoom print("Set zoom to {}".format(config.zoom)) shape = config.meta_dict[config.MetaKeys.SHAPE] if shape: # parse shape, storing only in dict config.meta_dict[config.MetaKeys.SHAPE] = [ int(n) for n in shape.split(",")[::-1]] # set up ROI and register profiles setup_roi_profiles(args.roi_profile) setup_atlas_profiles(args.atlas_profile) setup_grid_search_profiles(args.grid_search) if args.plane is not None: config.plane = args.plane print("Set plane to {}".format(config.plane)) if args.save_subimg: config.save_subimg = args.save_subimg print("Set to save the sub-image") if args.labels: # set up atlas labels setup_labels(args.labels) if args.transform is not None: # image transformations such as flipping, rotation config.transform = args_to_dict( args.transform, config.Transforms, config.transform) print("Set transformations to {}".format(config.transform)) if args.register: # register type to process in register module config.register_type = args.register print("Set register type to {}".format(config.register_type)) if args.df: # data frame processing task config.df_task = args.df print("Set data frame processing task to {}".format(config.df_task)) if args.plot_2d: # 2D plot type to process in plot_2d module config.plot_2d_type = args.plot_2d print("Set plot_2d type to {}".format(config.plot_2d_type)) if args.slice: # specify a generic slice by command-line, assuming same order # of arguments as for slice built-in function and interpreting # "none" string as None config.slice_vals = args.slice.split(",") config.slice_vals = [_parse_none(val.lower(), int, "none") for val in config.slice_vals] print("Set slice values to {}".format(config.slice_vals)) if args.delay: config.delay = int(args.delay) print("Set delay to {}".format(config.delay)) if args.show: # show images after task is performed, if supported config.show = _is_arg_true(args.show) print("Set show to {}".format(config.show)) if args.groups: config.groups = args.groups print("Set groups to {}".format(config.groups)) if args.ec2_start is not None: # start EC2 instances config.ec2_start = args_with_dict(args.ec2_start) print("Set ec2 start to {}".format(config.ec2_start)) if args.ec2_list: # list EC2 instances config.ec2_list = args_with_dict(args.ec2_list) print("Set ec2 list to {}".format(config.ec2_list)) if args.ec2_terminate: config.ec2_terminate = args.ec2_terminate print("Set ec2 terminate to {}".format(config.ec2_terminate)) if args.notify: notify_len = len(args.notify) if notify_len > 0: config.notify_url = args.notify[0] print("Set notification URL to {}".format(config.notify_url)) if notify_len > 1: config.notify_msg = args.notify[1] print("Set notification message to {}".format(config.notify_msg)) if notify_len > 2: config.notify_attach = args.notify[2] print("Set notification attachment path to {}" .format(config.notify_attach)) if args.prefix is not None: # path input/output prefixes config.prefixes = args.prefix config.prefix = config.prefixes[0] print("Set path prefixes to {}".format(config.prefixes)) if args.prefix_out is not None: # path output prefixes config.prefixes_out = args.prefix_out config.prefix_out = config.prefixes_out[0] print("Set path prefixes to {}".format(config.prefixes_out)) if args.suffix is not None: # path suffixes config.suffixes = args.suffix config.suffix = config.suffixes[0] print("Set path suffixes to {}".format(config.suffixes)) if args.alphas: # specify alpha levels config.alphas = [float(val) for val in args.alphas.split(",")] print("Set alphas to", config.alphas) if args.vmin: # specify vmin levels config.vmins = [ libmag.get_int(val) for val in args.vmin.split(",")] print("Set vmins to", config.vmins) if args.vmax: # specify vmax levels and copy to vmax overview used for plotting # and updated for normalization config.vmaxs = [ libmag.get_int(val) for val in args.vmax.split(",")] config.vmax_overview = list(config.vmaxs) print("Set vmaxs to", config.vmaxs) if args.rgb: # flag to open images as RGB config.rgb = args.rgb _logger.info("Set RGB to %s", config.rgb) if args.reg_suffixes is not None: # specify suffixes of registered images to load config.reg_suffixes = args_to_dict( args.reg_suffixes, config.RegSuffixes, config.reg_suffixes) print("Set registered image suffixes to {}".format(config.reg_suffixes)) if args.seed: # specify random number generator seed config.seed = int(args.seed) print("Set random number generator seed to", config.seed) if args.plot_labels is not None: # specify general plot labels config.plot_labels = args_to_dict( args.plot_labels, config.PlotLabels, config.plot_labels) print("Set plot labels to {}".format(config.plot_labels)) if args.theme is not None: # specify themes, currently applied to Matplotlib elements theme_names = [] for theme in args.theme: # add theme enum if found theme_enum = libmag.get_enum(theme, config.Themes) if theme_enum: config.rc_params.append(theme_enum) theme_names.append(theme_enum.name) print("Set to use themes to {}".format(theme_names)) # set up Matplotlib styles/themes plot_2d.setup_style() if args.classifier is not None: # classifier settings args_to_dict(args.classifier, config.classifier) print("Set classifier to {}".format(config.classifier)) if args.db: # set main database path to user arg config.db_path = args.db print("Set database name to {}".format(config.db_path)) else: # set default path config.db_path = os.path.join(user_dir, config.db_path) if args.truth_db: # set settings for separate database of "truth blobs" config.truth_db_params = args_to_dict( args.truth_db, config.TruthDB, config.truth_db_params, sep_vals="|") mode = config.truth_db_params[config.TruthDB.MODE] config.truth_db_mode = libmag.get_enum(mode, config.TruthDBModes) libmag.printv(config.truth_db_params) print("Mapped \"{}\" truth_db mode to {}" .format(mode, config.truth_db_mode)) # notify user of full args list, including unrecognized args _logger.info(f"All command-line arguments:\n{pprint.pformat(sys.argv)}") if args_unknown: _logger.info( f"The following command-line arguments were unrecognized and " f"ignored:\n{pprint.pformat(args_unknown)}")
[docs] def setup_image( path: str, series: Optional[int] = None, proc_tasks: Optional[Dict["config.ProcessTypes", Any]] = None): """Set up the main image from CLI args and process any tasks. Args: path: Image path. series: Image series, such as a tile; defaults to None. proc_tasks: Dictionary of processing tasks; defaults to None. """ # deconstruct user-supplied image filename filename, offset, size, reg_suffixes, suffix = \ importer.deconstruct_img_name(path) set_subimg, _ = importer.parse_deconstructed_name( filename, offset, size, reg_suffixes, suffix) if not set_subimg: # sub-image parameters set in filename takes precedence for # the loaded image, but fall back to user-supplied args offset = config.subimg_offsets[0] if config.subimg_offsets else None size = config.subimg_sizes[0] if config.subimg_sizes else None if proc_tasks: for proc_task, proc_val in proc_tasks.items(): # set up image for the given task np_io.setup_images( filename, series, offset, size, proc_task, fallback_main_img=False) process_file( filename, proc_task, proc_val, series, offset, size, config.roi_offsets[0] if config.roi_offsets else None, config.roi_sizes[0] if config.roi_sizes else None) else: # set up image without a task specified, eg for display np_io.setup_images(filename, series, offset, size)
[docs] def process_proc_tasks( path: Optional[str] = None, series_list: Optional[Sequence[int]] = None ) -> Optional[Dict["config.ProcessTypes", Any]]: """Apply processing tasks. Args: path: Base path to main image file; defaults to None, in which case :attr:`config.filename` will be used. series_list: Sequence of images series, such as tiles; defaults to None. Returns: Dictionary of set processing types. """ if path is None: path = config.filename if not path: print("No image filename set for processing files, skipping") return None if series_list is None: series_list = config.series_list # filter out unset tasks proc_tasks = {k: v for k, v in config.proc_type.items() if v} for series in series_list: # process files for each series, typically a tile within a # microscopy image set or a single whole image setup_image(path, series, proc_tasks) return proc_tasks
[docs] def process_tasks(): """Process command-line tasks. Perform tasks set by the ``--proc`` parameter or any other entry point, such as ``--register`` tasks. Only the first identified task will be performed. """ # if command-line driven task specified, start task and shut down if config.register_type: register.main() elif config.notify_url: notify.main() elif config.plot_2d_type: plot_2d.main() elif config.df_task: df_io.main() elif config.grid_search_profile: _grid_search(config.series_list) elif config.ec2_list or config.ec2_start or config.ec2_terminate: # defer importing AWS module to avoid making its dependencies # required for MagellanMapper from magmap.cloud import aws aws.main() else: # processing tasks proc_tasks = process_proc_tasks() if not proc_tasks or config.ProcessTypes.LOAD in proc_tasks: # do not shut down since not a command-line task or if loading files return shutdown()
[docs] def setup_dbs(): """Set up databases for the given image file. Only sets up each database if it has not been set up already. """ # prep filename filename_base = None if config.filename: filename_base = importer.filename_to_base( config.filename, config.series) # get any user-supplied truth database path, falling back to name based # on filename or default name truth_db_path = config.truth_db_params[config.TruthDB.PATH] user_dir = config.user_app_dirs.user_data_dir truth_db_name_base = filename_base if filename_base else os.path.join( user_dir, sqlite.DB_NAME_BASE) if config.truth_db_mode is config.TruthDBModes.VIEW: # loads truth DB as a separate database in parallel with the given # editable database, with name based on filename by default unless # truth DB name explicitly given path = truth_db_path if truth_db_path else truth_db_name_base try: sqlite.load_truth_db(path) except FileNotFoundError as e: print(e) print("Could not load truth DB from current image path") elif config.truth_db_mode is config.TruthDBModes.VERIFY: if not config.verified_db: # creates a new verified DB to store all ROC results config.verified_db = sqlite.ClrDB() config.verified_db.load_db( os.path.join(user_dir, sqlite.DB_NAME_VERIFIED), True) if truth_db_path: # load truth DB path to verify against if explicitly given try: sqlite.load_truth_db(truth_db_path) except FileNotFoundError as e: print(e) print("Could not load truth DB from {}" .format(truth_db_path)) elif config.truth_db_mode is config.TruthDBModes.VERIFIED: # loads verified DB as the main DB, which includes copies of truth # values with flags for whether they were detected path = os.path.join(user_dir, sqlite.DB_NAME_VERIFIED) if truth_db_path: path = truth_db_path try: config.db = sqlite.ClrDB() config.db.load_db(path) config.verified_db = config.db except FileNotFoundError as e: print(e) print("Could not load verified DB from {}" .format(sqlite.DB_NAME_VERIFIED)) elif config.truth_db_mode is config.TruthDBModes.EDIT: # loads truth DB as the main database for editing rather than # loading as a truth database config.db_path = truth_db_path if not config.db_path: config.db_path = "{}{}".format( os.path.basename(truth_db_name_base), sqlite.DB_SUFFIX_TRUTH) print("Editing truth database at {}".format(config.db_path)) if config.db is None: # load the main database config.db = sqlite.ClrDB() config.db.load_db(None, False)
[docs] def main(process_args_only: bool = False, skip_dbs: bool = False): """Starts the visualization GUI. Processes command-line arguments. Args: process_args_only: Processes command-line arguments and returns; defaults to False. skip_dbs: True to skip loading databases; defaults to False. """ # parse command-line arguments process_cli_args() if not skip_dbs: # load databases setup_dbs() # set multiprocessing start method chunking.set_mp_start_method() if process_args_only: return # process tasks process_tasks()
[docs] def setup_roi_profiles(roi_profiles_names: List[str]): """Set up ROI profiles. If a profile is None, only a default set of profile settings will be generated. Also sets up colormaps based on ROI profiles. Any previously set up profile will be replaced. Args: roi_profiles_names: Sequence of ROI and atlas profiles names to use for the corresponding channel. """ # initialize ROI profile settings and update with modifiers config.roi_profile = roi_prof.ROIProfile() config.roi_profiles = [config.roi_profile] if roi_profiles_names is not None: for i, roi_prof_name in enumerate(roi_profiles_names): _logger.debug("Updating ROI profile for channel %s", i) if i == 0: settings = config.roi_profile else: settings = roi_prof.ROIProfile() config.roi_profiles.append(settings) settings.add_profiles(roi_prof_name) for i, prof in enumerate(config.roi_profiles): if i == 0: _logger.info( "Set default (channel 0) ROI profile: %s", prof[prof.NAME_KEY]) else: _logger.info( "Added channel %s ROI profile: %s", i, prof[prof.NAME_KEY]) colormaps.setup_colormaps(np_io.get_num_channels(config.image5d))
[docs] def setup_atlas_profiles(atlas_profiles_names: str, reset: bool = True): """Set up atlas profiles. If a profile is None, only a default set of profile settings will be generated. Any previously set up profile will be replaced. Args: atlas_profiles_names: Atlas profiles names. reset: True to reset profiles before setting profiles from ``atlas_profile_names``; defaults to True. """ # initialize atlas profile and update with modifiers if reset: config.atlas_profile = atlas_prof.AtlasProfile() if atlas_profiles_names is not None: config.atlas_profile.add_profiles(atlas_profiles_names) _logger.info( "Set atlas profile to %s", config.atlas_profile[config.atlas_profile.NAME_KEY])
[docs] def setup_grid_search_profiles(grid_search_profiles_names: str): """Setup grid search profiles. If a profile is None, only a default set of profile settings will be generated. Any previously set up profile will be replaced. Args: grid_search_profiles_names: Grid search profiles names. """ if grid_search_profiles_names: # parse grid search profiles config.grid_search_profile = grid_search_prof.GridSearchProfile() config.grid_search_profile.add_profiles(grid_search_profiles_names) _logger.info( "Set grid search profile to %s", config.grid_search_profile[config.grid_search_profile.NAME_KEY]) _logger.debug(config.grid_search_profile)
[docs] def update_profiles(): """Update profiles if any profile file has been modified since it was last loaded. Profiles in both :attr:`config.process_settings_list` and :attr:`config.register_settings_list` will be checked to update. """ for i, prof in enumerate(config.roi_profiles): prof.refresh_profile(True) config.atlas_profile.refresh_profile(True)
[docs] def setup_labels(labels_arg: List[str]): """Set up atlas labels. Args: labels_arg: Path to labels reference file, such as a labels ontology file. """ # atlas labels as positional or dictionary-like args config.atlas_labels = args_to_dict( labels_arg, config.AtlasLabels, config.atlas_labels) config.load_labels = config.atlas_labels[config.AtlasLabels.PATH_REF] config.labels_level = config.atlas_labels[config.AtlasLabels.LEVEL] print("Set labels to {}".format(config.atlas_labels))
def _detect_subimgs( path: str, series: int, subimg_offsets: List[List[int]], subimg_sizes: List[List[int]] ) -> Tuple[Union[np.ndarray, Any], List[str]]: """Detect blobs in an image across sub-image offsets. Args: path: Path to image from which MagellanMapper-style paths will be generated. series: Image series number. subimg_offsets: Nested list of sub-image offset sets given as ``[[offset_z1, offset_y1, offset_x1], ...]``. subimg_sizes: Nested list of sub-image size sets given as ``[[offset_z1, offset_y1, offset_x1], ...]`` and corresponding to ``subimg_offsets``. Returns: Summed stats array and concatenated summaries. """ stat = np.zeros(3) # use whole image if sub-image parameters are not set if subimg_offsets is None: subimg_offsets = [None] if subimg_sizes is None: subimg_sizes = [None] roi_sizes_len = len(subimg_sizes) summaries = [] for i in range(len(subimg_offsets)): size = (subimg_sizes[i] if roi_sizes_len > 1 else subimg_sizes[0]) np_io.setup_images(path, series, subimg_offsets[i], size) stat_roi, fdbk, _ = stack_detect.detect_blobs_stack( importer.filename_to_base(path, series), subimg_offsets[i], size) if stat_roi is not None: stat = np.add(stat, stat_roi) summaries.append( "Offset {}:\n{}".format(subimg_offsets[i], fdbk)) return stat, summaries def _grid_search(series_list: List[int]): # grid search(es) for the specified hyperparameter groups if not config.filename: print("No image filename set for grid search, skipping") return for series in series_list: # process each series, typically a tile within an microscopy image # set or a single whole image stats_dict = mlearn.grid_search( config.grid_search_profile.hyperparams, _detect_subimgs, config.filename, series, config.subimg_offsets, config.subimg_sizes) parsed_dict, stats_df = mlearn.parse_grid_stats(stats_dict) # plot ROC curve plot_2d.plot_roc(stats_df, config.show)
[docs] def process_file( path: str, proc_type: Enum, proc_val: Optional[Any] = None, series: Optional[int] = None, subimg_offset: Optional[List[int]] = None, subimg_size: Optional[List[int]] = None, roi_offset: Optional[List[int]] = None, roi_size: Optional[List[int]] = None ) -> Tuple[Optional[Any], Optional[str]]: """Processes a single image file non-interactively. Assumes that the image has already been set up. Args: path: Path to image from which MagellanMapper-style paths will be generated. proc_type: Processing type, which should be a one of :class:`config.ProcessTypes`. proc_val: Processing value associated with ``proc_type``; defaults to None. series: Image series number; defaults to None. subimg_offset: Sub-image offset as (z,y,x) to load; defaults to None. subimg_size: Sub-image size as (z,y,x) to load; defaults to None. roi_offset: Region of interest offset as (x, y, z) to process; defaults to None. roi_size: Region of interest size of region to process, given as ``(x, y, z)``; defaults to None. Returns: Tuple of stats from processing, or None if no stats, and text feedback from the processing, or None if no feedback. """ # PROCESS BY TYPE stats = None fdbk = None filename_base = importer.filename_to_base(path, series) print("{}\n".format("-" * 80)) if proc_type is config.ProcessTypes.LOAD: # loading completed return None, None elif proc_type is config.ProcessTypes.LOAD: # already imported so does nothing print("imported {}, will exit".format(path)) elif proc_type is config.ProcessTypes.EXPORT_ROIS: # export ROIs; assumes that metadata was already loaded to give smaller # region from which smaller ROIs from the truth DB will be extracted from magmap.io import export_rois db = config.db if config.truth_db is None else config.truth_db export_path = naming.make_subimage_name( filename_base, subimg_offset, subimg_size) export_rois.export_rois( db, config.img5d, config.channel, export_path, config.plot_labels[config.PlotLabels.PADDING], config.unit_factor, config.truth_db_mode, os.path.basename(export_path)) elif proc_type is config.ProcessTypes.TRANSFORM: # transpose, rescale, and/or resize whole large image transformer.transpose_img( path, series, plane=config.plane, rescale=config.transform[config.Transforms.RESCALE], target_size=config.roi_size) elif proc_type in ( config.ProcessTypes.EXTRACT, config.ProcessTypes.ANIMATED): # generate animated GIF or extract single plane export_stack.stack_to_img( config.filenames, roi_offset, roi_size, series, subimg_offset, subimg_size, proc_type is config.ProcessTypes.ANIMATED, config.suffix) elif proc_type is config.ProcessTypes.EXPORT_BLOBS: # export blobs to CSV file from magmap.io import export_rois export_rois.blobs_to_csv(config.blobs.blobs, filename_base) elif proc_type in ( config.ProcessTypes.DETECT, config.ProcessTypes.DETECT_COLOC): # detect blobs in the full image, +/- co-localization coloc = proc_type is config.ProcessTypes.DETECT_COLOC stats, fdbk, _ = stack_detect.detect_blobs_stack( filename_base, subimg_offset, subimg_size, coloc) elif proc_type is config.ProcessTypes.COLOC_MATCH: if config.blobs is not None and config.blobs.blobs is not None: # colocalize blobs in separate channels by matching blobs shape = subimg_size if shape is None: # get shape from loaded image, falling back to its metadata if config.image5d is not None: shape = config.image5d.shape[1:] else: shape = config.img5d.meta[config.MetaKeys.SHAPE][1:] matches = colocalizer.StackColocalizer.colocalize_stack( shape, config.blobs, config.channel) # insert matches into database colocalizer.insert_matches(config.db, matches) else: print("No blobs loaded to colocalize, skipping") elif proc_type is config.ProcessTypes.CLASSIFY: # classify blobs try: classifier.ClassifyImage.classify_whole_image() config.blobs.save_archive() except FileNotFoundError as e: _logger.debug(e) elif proc_type in (config.ProcessTypes.EXPORT_PLANES, config.ProcessTypes.EXPORT_PLANES_CHANNELS): # export each plane as a separate image file export_stack.export_planes( config.image5d, config.savefig, config.channel, proc_type is config.ProcessTypes.EXPORT_PLANES_CHANNELS) elif proc_type is config.ProcessTypes.EXPORT_RAW: # export the main image as a raw data file out_path = libmag.combine_paths(config.filename, ".raw", sep="") libmag.backup_file(out_path) np_io.write_raw_file(config.image5d, out_path) elif proc_type is config.ProcessTypes.EXPORT_TIF: # export the main image as a TIF files for each channel np_io.write_tif(config.image5d, config.filename) elif proc_type is config.ProcessTypes.PREPROCESS: # pre-process a whole image and save to file # TODO: consider chunking option for larger images out_path = config.prefix if not out_path: out_path = libmag.insert_before_ext(config.filename, "_preproc") transformer.preprocess_img( config.image5d, proc_val, config.channel, out_path) return stats, fdbk
[docs] def shutdown(): """Clean up and shutdown MagellanMapper. Stops any running Java virtual machine and closes any main database. """ importer.stop_jvm() if config.db is not None: config.db.conn.close() if config.prefs is not None: config.prefs.save_settings(config.PREFS_PATH) sys.exit()
if __name__ == "__main__": _logger.info("Starting MagellanMapper command-line interface...") main()