# Input/Output with the Python Subprocess module# Author: David Young, 2020"""File management using underlying system commands accessed through thePython :mod:`subprocess` module."""importosimportsubprocessfrommagmap.ioimportlibmag
[docs]defcompress_file(path_in,path_out=None):"""Compress a file or files by tar archving and compressing with. Assumes that ``tar`` and ``zstd`` are available shell commands. Args: path_in (str, List[str]): Input file path; can be a sequence of paths, in which case the first path will determine the working directory and the default ``path_out`` if none is given. path_out (str): Output file path; defaults to None to use the same path as ``path_in`` with ``.tar.zstd`` appended. """tar_args=["tar","cfv","-"]iflibmag.is_seq(path_in):# set up sequence of paths, using the first path as templatepath_first=path_in[0]tar_args.extend([os.path.basename(p)forpinpath_in])else:# set up a single pathpath_first=path_intar_args.append(os.path.basename(path_in))ifpath_outisNone:# default to using the first path for compressed file namepath_out=os.path.splitext(path_first)[0]+".tar.zst"# use first path for working directorywd=os.path.dirname(path_first)ifnotwd:wd=None# archive with tar and pipe to zstd for compressionprint("Compressing \"{}\" to \"{}\"".format(path_in,path_out))tar=subprocess.Popen(tar_args,cwd=wd,stdout=subprocess.PIPE,bufsize=0)withopen(path_out,"wb")asf:zstd=subprocess.Popen(["pzstd","-v"],stdin=tar.stdout,stdout=f,bufsize=0)tar.stdout.close()# appears to work for large image with memory issues because data# are backed by filesstderr=zstd.communicate()[1]ifstderr:print(stderr)
[docs]deftest_compression(path):"""Test the integrity of a file compressed by ZSTD. Args: path (str): Path to compressed file. Returns: bool: True if the integrity check completed without error; False if otherwise. """print("Testing integrity of compressed file:",path)try:subprocess.check_call(["pzstd","-t",path])print("Integrity test of \"{}\" completed without error".format(path))returnTrueexceptsubprocess.CalledProcessErrorase:print(e)print("Error during compression integrity testing of",path)returnFalse
[docs]defdecompress_file(path_in,dir_out=None):"""Decompress and unarchive a file. Assumes that the file has been archived by ``tar`` and compressed by ``zstd``, both available as shell commands. Args: path_in (str): Input path. dir_out (str): Output directory path; defaults to None to output to the current directory. """tar_args=["tar","xvf","-"]ifdir_out:ifnotos.path.isdir(dir_out):os.makedirs(dir_out)tar_args.extend(["-C",dir_out])else:dir_out="."print("decompressing {} to {}".format(path_in,dir_out))zst=subprocess.Popen(["pzstd","-dc",path_in],stdout=subprocess.PIPE,bufsize=0)tar=subprocess.Popen(tar_args,stdin=zst.stdout,bufsize=0)zst.stdout.close()stderr=tar.communicate()[1]ifstderr:print(stderr)