"""Decorators that wrap a function.

If the function is defined in the file where you want to use the decorator,
you can decorate the function at define time:

.. code-block:: python

    def somefunc():

If the function is defined somewhere else, do:

.. code-block:: python

    from numpy import mean

    mean = decorator(mean)

from time import time, sleep
from os import makedirs, environ
import os.path
from os.path import join, dirname, getmtime

from .os import _is_cnspath, _no_trailing_slash, cp, rm

from .log import get_logger
logger = get_logger()

[docs]def colossus_interface(somefunc): """Wraps black-box functions to read from and write to Google Colossus. Because it's hard (if possible at all) to figure out which path is input, and which is output, when the input function is black-box, this is a "best-effort" decorator (see below for warnings). This decorator works by looping through all the positional and keyword parameters, copying CNS paths that exist prior to ``somefunc`` execuation to temporary local locations, running ``somefunc`` and writing its output to local locations, and finally copying local paths that get modified by ``somefunc`` to their corresponding CNS locations. Warning: Therefore, if ``somefunc``'s output already exists (e.g., you are re-running the function to overwrite the old result), it will be copied to local, overwritten by ``somefunc`` locally, and finally copied back to CNS. This doesn't lead to wrong behaviors, but is inefficient. This decorator doesn't depend on Blaze, as it's using the ``fileutil`` CLI, rather than ``google3.pyglib.gfile``. This is convenient in at least two cases: - You are too lazy to use Blaze, want to run tests quickly on your local machine, but need access to CNS files. - Your IO is more complex than what ``with gfile.Open(...) as h:`` can do (e.g., a Blender function importing an object from a path), in which case you have to copy the CNS file to local ("local" here could also mean a Borglet's local). This interface generally works with resolved paths (e.g., ``/path/to/file``), but not with wildcard paths (e.g., ``/path/to/???``), sicne it's hard (if possible at all) to guess what your function tries to do with such wildcard paths. Writes - Input files copied from Colossus to ``$TMP/``. - Output files generated to ``$TMP/``, to be copied to Colossus. """ # $TMP set by Borg or yourself (e.g., with .bashrc) tmp_dir = environ.get('TMP', '/tmp/') def gen_local_path(cns_path): keep_last_n = 3 cns_path = _no_trailing_slash(cns_path) local_basename = '_'.join(cns_path.split('/')[-keep_last_n:]) # Prefixed by time to avoid dupes local_path = join(tmp_dir, '%f_%s' % (time(), local_basename)) # local_path guaranteed to not end with '/' return local_path def cp_404ok(src, dst): try: cp(src, dst) logger.debug("\n%s\n\tcopied to\n%s", src, dst) except FileNotFoundError: logger.warning(("Source doesn't exist yet:\n\t%s\n" "OK if this will be the output"), src) def wrapper(*arg, **kwargs): t_eps = 0.05 # seconds buffer for super fast somefunc # Fetch info. for all CNS paths arg_local, kwargs_local = [], {} cns2local = {} # Positional arguments for x in arg: if _is_cnspath(x): local_path = gen_local_path(x) cns2local[x] = local_path arg_local.append(local_path) else: # intact arg_local.append(x) # Keyword arguments for k, v in kwargs.items(): if _is_cnspath(v): local_path = gen_local_path(v) cns2local[v] = local_path kwargs_local[k] = local_path else: # intact kwargs_local[k] = v # For reading: copy CNS paths that exist to local # TODO: what if some of those paths are not input? Copying them to # local is a waste (but harmless) for cns_path, local_path in cns2local.items(): cp_404ok(cns_path, local_path) # Run the real function t0 = time() sleep(t_eps) results = somefunc(*arg_local, **kwargs_local) # For writing: copy local paths that are just modified and correspond # to CNS paths back to CNS for cns_path, local_path in cns2local.items(): if os.path.exists(local_path) and getmtime(local_path) > t0: cp_404ok(local_path, cns_path) # Free up the space by deleting the temporary files for _, local_path in cns2local.items(): rm(local_path) return results return wrapper
[docs]def timeit(somefunc): """Outputs the time a function takes to execute.""" def wrapper(*arg, **kwargs): t0 = time() results = somefunc(*arg, **kwargs) t = time() - t0"Time elapsed: %f seconds", t) return results return wrapper
[docs]def existok(makedirs_func): """Implements the ``exist_ok`` flag in 3.2+, which avoids race conditions, where one parallel worker checks the folder doesn't exist and wants to create it with another worker doing so faster. """ def wrapper(*args, **kwargs): try: makedirs_func(*args, **kwargs) except OSError as e: if e.errno != 17: raise logger.debug("%s already exists, but that is OK", args[0]) return wrapper
[docs]def main(): """Unit tests that can also serve as example usage.""" # timeit @timeit def findsums(x, y, z): sleep(1) return x + y, x + z, y + z, x + y + z print(findsums(1, 2, 3)) # existok newdir = join(dirname(__file__), 'test') makedirs_ = existok(makedirs) makedirs_(newdir) makedirs_(newdir)
if __name__ == '__main__': main()