Source code for paranoid.decorators

# Copyright 2018 Max Shinn <max@maxshinnpotential.com>
# 
# This file is part of Paranoid Scientist, and is available under the
# MIT license.  Please see LICENSE.txt in the root directory for more
# information.

"""Function (and class) decorators which constitute the primary Paranoid Scientist interface"""

__all__ = ['accepts', 'requires', 'returns', 'ensures', 'paranoidclass', 'paranoidconfig']
import functools, itertools
import inspect
from random import randint
from copy import deepcopy
from . import utils as U
from .types import base as T
from .types.collections import Dict, List
from .types.string import String
from . import exceptions as E
from .settings import Settings

# Constants used internally
_BT = "__BACKTICK__"
_RET = "__RETURN__"


def _check_accepts(func, argvals):
    # @accepts decorator
    if U.has_fun_prop(func, "argtypes"):
        argtypes = U.get_fun_prop(func, "argtypes")
        if sorted(argtypes.keys()) != sorted(argvals.keys()):
            raise E.ArgumentTypeError("Invalid argument specification in %s" % func.__name__)
        for k in argtypes.keys():
            try:
                argtypes[k].test(argvals[k])
            except AssertionError as e:
                raise E.ArgumentTypeError("Invalid argument type: %s=%s is not of type %s in %s" % (k, argvals[k], argtypes[k], func.__qualname__))

def _check_requires(func, argvals):
    # @requires decorator
    if U.has_fun_prop(func, "requires"):
        # Function named arguments
        full_globals = Settings.get("namespace").copy()
        full_globals.update(argvals)
        #full_locals = locals().copy()
        #full_locals.update({k : v for k,v in zip(argspec.args, args)})
        for requirement,requirementtext,requirementdesc in U.get_fun_prop(func, "requires"):
            try:
                if not eval(requirement, full_globals, {}):
                    desctext = requirementdesc+"\n" if requirementdesc is not None else ""
                    raise E.EntryConditionsError(desctext+"Function requirement '%s' failed in %s\nparams: %s" % (requirementtext,  func.__qualname__, str(argvals)))
            except Exception as e:
                if isinstance(e, E.EntryConditionsError):
                    raise
                else:
                    raise E.EntryConditionsError("Invalid function requirement '%s' in %s\nparams: %s" % (requirementtext,  func.__qualname__, str(argvals)))

def _check_returns(func, returnvalue):
    # @returns decorator
    if U.has_fun_prop(func, "returntype"):
        try:
            U.get_fun_prop(func, "returntype").test(returnvalue)
        except AssertionError as e:
            raise E.ReturnTypeError("Invalid return type of %s in %s" % (returnvalue, func.__qualname__) )

def _check_ensures(func, returnvalue, argvals):
    # @ensures decorator
    if U.has_fun_prop(func, "ensures"):
        # This function call
        current_call = argvals
        current_call[_RET] = returnvalue
        if U.has_fun_prop(func, "exec_cache"):
            exec_cache = U.get_fun_prop(func, "exec_cache")
        else:
            exec_cache = []
        for btdepth, ensurement, etext in U.get_fun_prop(func, "ensures"):
            # Here we check the higher order properties, e.g. x,
            # x`, and x``. There is a lot of repeated and opaque
            # code here, but I've tried to write it in the
            # cleanest way possible.
            for comb in itertools.combinations(exec_cache, btdepth):
                for params in itertools.permutations([current_call]+list(comb)):
                    env = dict()
                    for i in range(0, btdepth+1):
                        bts = "".join([_BT for j in range(0, i)])
                        env.update({k+bts : v for k,v in params[i].items()})
                    limited_globals = Settings.get("namespace").copy()
                    limited_globals.update(env)
                    if not eval(ensurement, limited_globals, {}):
                        env_simp = {k.replace(_BT, '`').replace(_RET, 'return'): v for k,v in env.items()}
                        raise E.ExitConditionsError("Ensures statement '%s' failed in %s\nparams: %s" % (etext, func.__qualname__, str(env_simp)))
        # Update the cache
        if any(btdepth>0 for btdepth,_,_ in U.get_fun_prop(func, "ensures")) : # Cache if we refer to previous executions
            # Keep track of number of executions for reservoir
            # sampling probabilities
            if exec_cache == []:
                n_execs = 1
                U.set_fun_prop(func, "exec_cache", exec_cache) # Create exec cache if it doesn't exist
            else:
                n_execs = U.get_fun_prop(func, "n_execs") + 1
            U.set_fun_prop(func, "n_execs", n_execs)
            # Use reservoir sampling to maintain the cache
            max_cache_size = Settings.get("max_cache", function=func)
            if n_execs <= max_cache_size:
                exec_cache.append(current_call)
            else:
                rn = randint(0, n_execs)
                if rn < max_cache_size:
                    exec_cache[rn] = current_call


def _wrap(func):
    def _decorated(*args, **kwargs):
        # Skip verification if paranoid is disabled.
        if Settings.get("enabled", function=func) == False:
            return func(*args, **kwargs)
        # We only run this function once for performance reasons, and
        # then pass it as an argument to each check function.
        sig = inspect.Signature.from_callable(func)
        boundargs = sig.bind_partial(*args, **kwargs)
        boundargs.apply_defaults()
        argvals = dict(boundargs.arguments)

        # Check entry conditions, run the function, check exit
        # conditions, and return the result of the function.
        _check_accepts(func, argvals)
        _check_requires(func, argvals)
        returnvalue = func(*args, **kwargs)
        _check_returns(func, returnvalue)
        _check_ensures(func, returnvalue, argvals)
        return returnvalue
    
    if U.has_fun_prop(func, "active"):
        return func
    else:
        U.set_fun_prop(func, "active", True)
        assign = functools.WRAPPER_ASSIGNMENTS + \
                 (U._FUN_PROPS, Settings.FUNCTION_SETTINGS_NAME)
        wrapped = functools.wraps(func, assigned=assign)(_decorated)
        # A list of all functions for when Paranoid Scientist is
        # invoked with "python3 -m paranoid scriptname.py".  If the
        # name "__ALL_FUNCTIONS" is not defined, then we assume
        # paranoid was not called in this way.  If it is defined, we
        # add this function to the list.
        if "__ALL_FUNCTIONS" in globals().keys():
            __ALL_FUNCTIONS.append(wrapped)
        return wrapped

[docs]def accepts(*argtypes, **kwargtypes): """A function decorator to specify argument types of the function. Types may be specified either in the order that they appear in the function or via keyword arguments (just as if you were calling the function). Example usage: | @accepts(Positive0) | def square_root(x): | ... """ theseargtypes = [T.TypeFactory(a) for a in argtypes] thesekwargtypes = {k : T.TypeFactory(a) for k,a in kwargtypes.items()} def _decorator(func): # @accepts decorator f = func.__wrapped__ if hasattr(func, "__wrapped__") else func sig = inspect.signature(f) boundargs = sig.bind(*theseargtypes, **thesekwargtypes) argtypes = {} # Loop through each of the parameters in the function's call # signature and make sure they were passed to @accepts for p in sig.parameters.values(): # Keyword arguments get the KeywordArguments() type. if p.kind == p.VAR_KEYWORD: if p.name in boundargs.arguments.keys(): raise E.ArgumentTypeError("Unexpected keyword arguments to @accepts in %s" % f.__qualname__) argtypes[p.name] = T.KeywordArguments() # Positional arguments get the PositionalArguments() type. elif p.kind == p.VAR_POSITIONAL: if p.name in boundargs.arguments.keys(): raise E.ArgumentTypeError("Unexpected arguments to @accepts in %s" % f.__qualname__) argtypes[p.name] = T.PositionalArguments() # Handle all normal arguments else: if p.name not in boundargs.arguments.keys(): raise E.ArgumentTypeError("Invalid argument specification to @accepts in %s" % f.__qualname__) argtypes[p.name] = T.TypeFactory(boundargs.arguments[p.name]) # Make sure extra arguments weren't passed to @accepts if set(boundargs.arguments.keys()) - set(sig.parameters.keys()) != set(): raise E.ArgumentTypeError("Invalid argument specification to @accepts in %s" % f.__qualname__) # Make sure @accepts hasn't already been called on this function if U.has_fun_prop(func, "argtypes"): raise ValueError("Cannot set argument types twice") U.set_fun_prop(func, "argtypes", argtypes) return _wrap(func) return _decorator
[docs]def returns(returntype): """A function decorator to specify return type of the function. Example usage: | @accepts(Positive0) | @returns(Positive0) | def square_root(x): | ... """ returntype = T.TypeFactory(returntype) def _decorator(func): # @returns decorator if U.has_fun_prop(func, "returntype"): raise ValueError("Cannot set return type twice") U.set_fun_prop(func, "returntype", returntype) return _wrap(func) return _decorator
# Adds the "requires" property: list of (compiledcondition, conditiontext)
[docs]def requires(condition, description=None): """A function decorator to specify entry conditions for the function. Entry conditions should be a string, which will be evaluated as Python code. Arguments of the function may be accessed by their name. Optionally, the second argument ("description") may describe the requirement in human-understandable language. This will be displayed in the error message, and may make it easier to debug software, or for users to know without looking at the code what may be wrong with the function call. The special syntax "-->" and "<-->" may be used to mean "if" and "if and only if", respectively. They may not be contained within sub-expressions. Note that globals will not be included by default, and must be manually included using the "namespace" setting, set via settings.Settings. Example usage: | @requires("x >= y") | def subtract(x, y): | ... | @accepts(l=List(Number), log_transform=Boolean) | @requires("log_transform == True --> min(l) > 0") | def process_list(l, log_transform=False): | ... """ def _decorator(func, condition=condition, description=description): # @requires decorator if U.has_fun_prop(func, "requires"): if not isinstance(U.get_fun_prop(func, "requires"), list): raise E.InternalError("Invalid requires structure") base_requires = U.get_fun_prop(func, "requires") else: base_requires = [] base_condition = condition if "<-->" in condition: condition_parts = condition.split("<-->") assert len(condition_parts) == 2, "Only one implies per statement in %s condition %s" % (condition, func.__qualname__) condition = "((%s) if (%s) else True) and ((%s) if (%s) else True)" % (condition_parts[1], condition_parts[0], condition_parts[0], condition_parts[1]) elif "-->" in condition: condition_parts = condition.split("-->") assert len(condition_parts) == 2, "Only one implies per statement in %s condition %s" % (base_condition, func.__qualname__) condition = "(%s) if (%s) else True" % (condition_parts[1], condition_parts[0]) U.set_fun_prop(func, "requires", [(compile(condition, '', 'eval'), condition, description)]+base_requires) return _wrap(func) return _decorator
# Adds the "requires" property: list of (backtickdepth, compiledcondition, conditiontext)
[docs]def ensures(condition): """A function decorator to specify exit conditions for the function. Exit conditions should be a string, which will be evaluated as Python code. Arguments of the function may be accessed by their name. The return value of the function may be accessed using the special variable name "return". The special syntax "-->" and "<-->" may be used to mean "if" and "if and only if", respectively. They may not be contained within sub-expressions. Values may be compared to previous executions of the function by including a "`" or "``" after them to check for higher order properties of the function. Note that globals will not be included by default, and must be manually included using the "namespace" setting, set via settings.Settings. Example usage: | @ensures("lower_bound <= return <= upper_bound") | def search(lower_bound, upper_bound): | ... | @ensures("x <= x` --> return <= return`") | def monotonic(x): | ... """ def _decorator(func, condition=condition): # @ensures decorator if U.has_fun_prop(func, "ensures"): if not isinstance(U.get_fun_prop(func, "ensures"), list): raise E.InternalError("Invalid ensures strucutre") ensures_statements = U.get_fun_prop(func, "ensures") else: ensures_statements = [] e = condition.replace("return", _RET) if "<-->" in e: e_parts = e.split("<-->") assert len(e_parts) == 2, "Only one implies per statement in %s condition %s" % (ensurement, func.__qualname__) e = "((%s) if (%s) else True) and ((%s) if (%s) else True)" % (e_parts[1], e_parts[0], e_parts[0], e_parts[1]) assert "-->" not in e, "Only one implies per statement in %s condition %s" % (ensurement, func.__qualname__) if "-->" in e: e_parts = e.split("-->") assert len(e_parts) == 2, "Only one implies per statement in %s condition %s" % (ensurement, func.__qualname__) e = "(%s) if (%s) else True" % (e_parts[1], e_parts[0]) # btdepth is the maximum number of consecutive ` characters # that appears in the ensures statement, and represents power # of the number of comparisons we must perform on cached # values. btdepth = max([0]+[sum(1 for x in g) for c,g in itertools.groupby(e) if c == "`"]) e = e.replace("`", _BT) compiled = compile(e, '', 'eval') U.set_fun_prop(func, "ensures", [(btdepth, compiled, condition)]+ensures_statements) return _wrap(func) return _decorator
[docs]def paranoidclass(cls): """A class decorator to specify that class methods contain paranoid decorators. Example usage: | @paranoidclass | class Point: | def __init__(self, x, y): | ... | @returns(Number) | def distance_from_zero(): | ... """ for methname in dir(cls): meth = getattr(cls, methname) if U.has_fun_prop(meth, "argtypes"): argtypes = U.get_fun_prop(meth, "argtypes") for argname in argtypes.keys(): if isinstance(argtypes[argname], T.Self): # "self" means something different in the __init__ # method than it does in other methods if methname == "__init__" and argname == "self": argtypes[argname] = T.InitGeneric(cls) else: argtypes[argname] = T.Generic(cls) # Somewhat of a hack to get And/Or to work with Self if isinstance(argtypes[argname], T.Or) or isinstance(argtypes[argname], T.And): for i in range(0, len(argtypes[argname].types)): if isinstance(argtypes[argname].types[i], T.Self): argtypes[argname].types[i] = T.Generic(cls) if U.has_fun_prop(meth, "returntype"): if isinstance(U.get_fun_prop(meth, "returntype"), T.Self): U.set_fun_prop(meth, "returntype", T.Generic(cls)) return cls
[docs]def paranoidconfig(**kwargs): """A function decorator to set a local setting. Settings may be set either globally (using settings.Settings.set()) or locally using this decorator. The setting name should be passed as a keyword argument, and the value to assign the setting should be passed as the value. See settings.Settings for the different settings which can be set. Example usage: | @returns(Number) | @paranoidconfig(enabled=False) | def slow_function(): | ... """ def _decorator(func): for k,v in kwargs.items(): Settings._set(k, v, function=func) return _wrap(func) return _decorator