PKź6\`ENENdispatch/functions.py"""Generic function implementations""" from __future__ import generators from dispatch.interfaces import * import protocols, inspect, sys, dispatch from peak.util.decorators import decorate_assignment, frameinfo, decorate_class from protocols.interfaces import allocate_lock from new import instancemethod from types import FunctionType, ClassType, InstanceType ClassTypes = (ClassType, type) __all__ = [ 'GenericFunction', 'Dispatcher', 'AbstractGeneric', ] _NF = (0,None, NoApplicableMethods(), (None,None)) try: frozenset except NameError: from sets import ImmutableSet as frozenset def _mkGeneric(oldfunc,argname): funcname = oldfunc.__name__ args, varargs, kwargs, defaults = inspect.getargspec(oldfunc) if defaults: tmpd = ["=__gfDefaults[%s]" % i for i in range(len(defaults))] else: tmpd = None argspec = inspect.formatargspec( args, varargs, kwargs, tmpd, formatvalue=lambda x:x) outargs = inspect.formatargspec(args,varargs,kwargs) protocol = protocols.Protocol() d={} s= """ def setup(__gfProtocol, __gfDefaults): def %(funcname)s%(argspec)s: __gfWhat = __gfProtocol(%(argname)s,None) if __gfWhat is None: raise NoApplicableMethods(%(argname)s) else: %(argname)s = __gfWhat[0] return __gfWhat[1]%(outargs)s return %(funcname)s """ % locals() exec s in globals(),d; func = d['setup'](protocol,defaults) def when(cond): """Add following function to this GF, using 'cond' as a guard""" def callback(frm,name,value,old_locals): declarePredicate(cond, protocol, lambda ob: (ob,value)) if old_locals.get(name) is func: return func return value return decorate_assignment(callback) def addMethod(cond,func): """Use 'func' when dispatch argument matches 'cond'""" declarePredicate(cond, protocol, lambda ob: (ob,func)) def clone(): """Return a simple generic function that "inherits" from this one""" f = _mkGeneric(oldfunc,argname) protocols.declareAdapter( protocols.NO_ADAPTER_NEEDED,[f.protocol],forProtocols=[protocol] ) return f func.addMethod = addMethod func.when = when func.clone = clone func.protocol = protocol func.__doc__ = oldfunc.__doc__ protocols.adviseObject(func,provides=[IExtensibleFunction]) return func # Bootstrap SimpleGeneric declaration helper function -- itself a SimpleGeneric [dispatch.on('ob')] def declarePredicate(ob,proto,factory): """Declare a SimpleGeneric dispatch predicate""" declarePredicate = _mkGeneric(declarePredicate,'ob') proto = declarePredicate.protocol def declareForType(typ,proto,factory): protocols.declareAdapter(factory,provides=[proto],forTypes=[typ]) def declareForProto(pro,proto,factory): protocols.declareAdapter(factory,provides=[proto],forProtocols=[pro]) def declareForSequence(seq,proto,factory): for item in seq: declarePredicate(item,proto,factory) declareForType(ClassType, proto, lambda ob:(ob,declareForType)) declareForType(type, proto, lambda ob:(ob,declareForType)) declareForProto(protocols.IOpenProtocol,proto, lambda ob:(ob,declareForProto)) declareForProto(protocols.IBasicSequence,proto, lambda ob:(ob,declareForSequence)) class ExprCache(object): __slots__ = 'cache','argtuple','expr_defs' def __init__(self,argtuple,expr_defs): self.argtuple = argtuple self.expr_defs = expr_defs self.cache = {} def __getitem__(self,item): if item==EXPR_GETTER_ID: return self.__getitem__ try: return self.argtuple[item] except IndexError: pass try: return self.cache[item] except KeyError: pass f,args = self.expr_defs[item] f = self.cache[item] = f(*map(self.__getitem__,args)) return f class BaseDispatcher: def __getitem__(self,argtuple): argct = self.argct node = self._dispatcher or self._startNode() or _NF expr, factory, func, init = node while factory: if func is None: self._acquire() try: if node[2] is None: node[2] = func = factory(*init) finally: self._release() if expr'method'""" cond = self.parseRule(signature) if cond is not None: for signature in IDispatchPredicate(cond): self[signature] = method return self._acquire() try: signature = strategy.Signature( [(self._dispatch_id(expr,criterion),criterion) for expr,criterion in ISignature(signature).items() if criterion is not strategy.NullCriterion ] ) self._addCase((signature, method)) self._addConstraints(signature) finally: self._release() [dispatch.on('rule')] def parseRule(self,rule,frame=None,depth=3): """Parse 'rule' if it's a string/unicode, otherwise return 'None'""" [parseRule.when([str,unicode])] def parseRule(self,rule,frame,depth): frame = frame or sys._getframe(depth) return self.parse(rule, frame.f_locals, frame.f_globals) [parseRule.when(object)] def parseRule(self,rule,frame,depth): return None def combine(self,cases): return strategy.single_best(cases) def _best_split(self, cases, disp_ids): """Return best (disp_id,method_map,remaining_ids) for current subtree""" best_id = None best_map = None best_spread = None remaining_ids = list(disp_ids) active_cases = len(cases) disabled = self.constraints.successors(remaining_ids) skipped = [] to_do = remaining_ids[:] for disp_id in to_do: if disp_id in disabled: # Skip criteria that have unchecked prerequisites skipped.append(disp_id) continue index = self.disp_indexes[disp_id] lindex, total_cases = index.count_for(cases) if total_cases == active_cases * lindex: # None of the index keys for this expression eliminate any # cases, so this expression isn't needed for dispatching remaining_ids.remove(disp_id) disabled = self.constraints.successors(remaining_ids) to_do.extend(skipped); skipped=[] continue spread = float(total_cases) / lindex if spread < best_spread or best_spread is None: best_spread = spread best_id = disp_id if best_id is not None: remaining_ids.remove(best_id) return best_id, tuple(remaining_ids) def _dispatch_id(self,(expr,disp_func),criterion): """Replace expr/criterion with a local key""" criterion.subscribe(self) expr = self.getExpressionId(expr) disp = expr, criterion.node_type if disp not in self.disp_indexes: self.disp_indexes[disp] = criterion.node_type.make_index() return expr def getExpressionId(self,expr): """Replace 'expr' with a local expression ID number""" # XXX this isn't threadsafe if not called from 'asFuncAndIds' try: return self.expr_map[expr] except KeyError: expr_def = IDispatchableExpression(expr).asFuncAndIds(self) try: return self.expr_map[expr_def] except KeyError: expr_id = len(self.expr_defs) self.expr_map[expr] = self.expr_map[expr_def] = expr_id self.expr_defs.append(expr_def) return expr_id def _addConstraints(self, signature): pre = [] for key,criterion in signature.items(): if key[0] >= self.argct: # constrain non-argument exprs for item in pre: self.constraints.add(item,key) pre.append(key) class AbstractGeneric(Dispatcher): protocols.advise(instancesProvide=[IGenericFunction]) delegate = None def __init__(self,func): self.delegate, args = _mkNormalizer(func, self) self.delegate.__dict__ = dict( [(k,getattr(self,k)) for k in dir(self.__class__) if not k.startswith('_')] ) self.delegate.__doc__ = self.__doc__ = func.__doc__ protocols.adviseObject(self.delegate,[IGenericFunction]) self.__name__ = func.__name__; self.__call__ = self.delegate Dispatcher.__init__(self,args) # We can't be used as a method, but make pydoc think we're a callable __get__ = None def addMethod(self,predicate,function,qualifier=None): if qualifier is not None: function = qualifier,function for signature in IDispatchPredicate(predicate): self[signature] = function def combine(self,cases): raise NotImplementedError( "The purpose of this class is to support *custom* method combiners" ) def __call__(__self,*args,**kw): return __self.delegate(*args,**kw) def _decorate(self,cond,qualifier=None,frame=None,depth=2): # XXX frame = frame or sys._getframe(depth) cond = self.parseRule(cond,frame=frame) or cond def registerMethod(frm,name,value,old_locals): if qualifier is None: func = value else: func = qualifier,value kind,module,locals_,globals_ = frameinfo(frm) if kind=='class': # 'when()' in class body; defer adding the method def registerClassSpecificMethod(cls): req = strategy.Signature( [(strategy.Argument(0),ICriterion(cls))] ) self.addMethod(req & cond, func) return cls decorate_class(registerClassSpecificMethod,frame=frm) else: self.addMethod(cond,func) if old_locals.get(name) in (self,self.delegate): return self.delegate return value return decorate_assignment(registerMethod,frame=frame) class GenericFunction(AbstractGeneric): """Extensible predicate dispatch generic function""" def combine(self,cases): strict = [strategy.ordered_signatures,strategy.safe_methods] loose = [strategy.ordered_signatures,strategy.all_methods] cases = strategy.separate_qualifiers( cases, around = strict, before = loose, primary = strict, after =loose, ) primary = strategy.method_chain(cases.get('primary',[])) if cases.get('after') or cases.get('before'): befores = strategy.method_list(cases.get('before',[])) afters = strategy.method_list(list(cases.get('after',[]))[::-1]) def chain(*args,**kw): for tmp in befores(*args,**kw): pass # toss return values result = primary(*args,**kw) for tmp in afters(*args,**kw): pass # toss return values return result else: chain = primary if cases.get('around'): chain = strategy.method_chain(list(cases['around'])+[chain]) return chain def around(self,cond): """Add function as an "around" method w/'cond' as a guard If 'cond' is parseable, it will be parsed using the caller's frame locals and globals. """ return self._decorate(cond,"around") def before(self,cond): """Add function as a "before" method w/'cond' as a guard If 'cond' is parseable, it will be parsed using the caller's frame locals and globals. """ return self._decorate(cond,"before") def after(self,cond): """Add function as an "after" method w/'cond' as a guard If 'cond' is parseable, it will be parsed using the caller's frame locals and globals. """ return self._decorate(cond,"after") def when(self,cond): """Add following function to this GF, w/'cond' as a guard If 'cond' is parseable, it will be parsed using the caller's frame locals and globals. """ return self._decorate(cond) def _mkNormalizer(func,dispatcher): funcname = func.__name__ if funcname=='': funcname = "anonymous" args, varargs, kwargs, defaults = inspect.getargspec(func) if defaults: tmpd = ["=__gfDefaults[%s]" % i for i in range(len(defaults))] else: tmpd = None argspec = inspect.formatargspec( args, varargs, kwargs, tmpd, formatvalue=lambda x:x) allargs = inspect.formatargspec(args,varargs,kwargs) outargs = inspect.formatargspec(args, varargs, kwargs, formatvarargs=lambda name:name, formatvarkw=lambda name:name, join=lambda seq:','.join(seq)) outargs = outargs[1:-1]+',' if outargs==',': outargs='' retargs = [] else: retargs = filter(None,outargs.replace(' ','').split(',')) d ={} s = """ def setup(__dispatcher,__gfDefaults): def %(funcname)s%(argspec)s: return __dispatcher((%(outargs)s))%(allargs)s return %(funcname)s """ % locals() exec s in globals(),d return d['setup'](dispatcher.__getitem__,defaults), retargs defaultNormalize = lambda *__args: __args PKź6f(!&!&dispatch/ast_builder.pyfrom token import tok_name, NAME, NUMBER, STRING, ISNONTERMINAL from symbol import sym_name from new import instancemethod import token, symbol, parser, sys __all__ = [ 'parse_expr', 'build' ] _name = lambda builder,nodelist: builder.Name(nodelist[1]) _const = lambda builder,nodelist: builder.Const(eval(nodelist[1])) production = { NAME: _name, NUMBER: _const, STRING: _const, } ops = { token.LEFTSHIFT: 'LeftShift', token.RIGHTSHIFT: 'RightShift', token.PLUS: 'Add', token.MINUS: 'Sub', token.STAR: 'Mul', token.SLASH: 'Div', token.PERCENT: 'Mod', token.DOUBLESLASH: 'FloorDiv', } def left_assoc(builder, nodelist): return getattr(builder,ops[nodelist[-2][0]])(nodelist[:-2],nodelist[-1]) def curry(f,*args): for arg in args: f = instancemethod(f,arg,type(arg)) return f def com_binary(opname, builder,nodelist): "Compile 'NODE (OP NODE)*' into (type, [ node1, ..., nodeN ])." items = [nodelist[i] for i in range(1,len(nodelist),2)] return getattr(builder,opname)(items) # testlist: test (',' test)* [','] # subscriptlist: subscript (',' subscript)* [','] testlist = subscriptlist = curry(com_binary, 'Tuple') # testlist_gexp test (gen_for | (',' test)* [',']) testlist_gexp = testlist # XXX # test: and_test ('or' and_test)* | lambdef test = curry(com_binary, 'Or') if sys.version>='2.5': or_test = test # test: or_test ['if' or_test 'else' test] | lambdef def test(builder, nodelist): return builder.IfElse(nodelist[1], nodelist[3], nodelist[5]) # and_test: not_test ('and' not_test)* and_test = curry(com_binary, 'And') # not_test: 'not' not_test | comparison def not_test(builder, nodelist): return builder.Not(nodelist[2]) # comparison: expr (comp_op expr)* def comparison(builder, nodelist): if len(nodelist)>4 and builder.simplify_comparisons: # Reduce (x < y < z ...) to (x' | '>=' | '<=' | '<>' | '!=' | '==' # | 'in' | 'not' 'in' | 'is' | 'is' 'not' n = nl[1] if n[0] == token.NAME: type = n[1] if len(nl) == 3: if type == 'not': type = 'not in' else: type = 'is not' else: type = n[1] results.append((type, nodelist[i])) return builder.Compare(nodelist[1], results) # expr: xor_expr ('|' xor_expr)* expr = curry(com_binary, 'Bitor') # xor_expr: and_expr ('^' and_expr)* xor_expr = curry(com_binary, 'Bitxor') # and_expr: shift_expr ('&' shift_expr)* and_expr = curry(com_binary, 'Bitand') # shift_expr: arith_expr ('<<'|'>>' arith_expr)* # arith_expr: term (('+'|'-') term)* # term: factor (('*'|'/'|'%'|'//') factor)* shift_expr = arith_expr = term = left_assoc unary_ops = { token.PLUS: 'UnaryPlus', token.MINUS: 'UnaryMinus', token.TILDE: 'Invert', } # factor: ('+'|'-'|'~') factor | power def factor(builder, nodelist): return getattr(builder,unary_ops[nodelist[1][0]])(nodelist[2]) # power: atom trailer* ['**' factor] def power(builder, nodelist): if nodelist[-2][0]==token.DOUBLESTAR: return builder.Power(nodelist[:-2], nodelist[-1]) node = nodelist[-1] nodelist = nodelist[:-1] t = node[1][0] # trailer: '(' [arglist] ')' | '[' subscriptlist ']' | '.' NAME if t == token.LPAR: return com_call_function(builder,nodelist,node[2]) elif t == token.DOT: return builder.Getattr(nodelist, node[2][1]) elif t == token.LSQB: item = node[2] while len(item)==2: item = item[1] if item[0]==token.COLON: lineno = item[2] return builder.Subscript(nodelist, (symbol.subscript, (token.STRING,`0`,lineno), item, (token.STRING,`sys.maxint`,lineno) ) ) return builder.Subscript(nodelist, item) raise AssertionError("Unknown power", nodelist) # atom: '(' [testlist_gexp] ')' | # '[' [listmaker] ']' | # '{' [dictmaker] '}' | # '`' testlist1 '`' | # NAME | NUMBER | STRING+ def atom(builder, nodelist): t = nodelist[1][0] if t == token.LPAR: if nodelist[2][0] == token.RPAR: return builder.Tuple(()) return build(builder,nodelist[2]) elif t==token.LSQB: if nodelist[2][0] == token.RSQB: return builder.List(()) return listmaker(builder,nodelist[2]) elif t==token.LBRACE: if nodelist[2][0] == token.RBRACE: items = () else: dm = nodelist[2] items = [(dm[i],dm[i+2]) for i in range(1,len(dm),4)] return builder.Dict(items) elif t==token.BACKQUOTE: return builder.Backquote(nodelist[2]) elif t==token.STRING: return builder.Const(eval(' '.join([n[1] for n in nodelist[1:]]))) raise AssertionError("Unknown atom", nodelist) # arglist: (argument ',')* (argument [',']| '*' test [',' '**' test] | '**' test) def com_call_function(builder, primaryNode, nodelist): if nodelist[0] == token.RPAR: return builder.CallFunc(primaryNode,(),(),None,None) args = []; kw = [] len_nodelist = len(nodelist) for i in range(1, len_nodelist, 2): node = nodelist[i] if node[0] == token.STAR or node[0] == token.DOUBLESTAR: break iskw, result = com_argument(node, kw) if iskw: kw.append(result) else: args.append(result) else: # No broken by star arg, so skip the last one we processed. i = i + 1 if i < len_nodelist and nodelist[i][0] == token.COMMA: # need to accept an application that looks like "f(a, b,)" i = i + 1 star_node = dstar_node = None while i < len_nodelist: tok = nodelist[i] ch = nodelist[i+1] i = i + 3 if tok[0]==token.STAR: star_node = ch elif tok[0]==token.DOUBLESTAR: dstar_node = ch else: raise AssertionError, 'unknown node type: %s' % (tok,) return builder.CallFunc(primaryNode, args, kw, star_node, dstar_node) # argument: [test '='] test [gen_for] (Really [keyword '='] test) def com_argument(nodelist, kw): # XXX Python 2.4 needs genexp handling here if len(nodelist) == 2: if kw: raise SyntaxError, "non-keyword arg after keyword arg" return 0, nodelist[1] n = nodelist[1] while len(n) == 2 and n[0] != token.NAME: n = n[1] if n[0] != token.NAME: raise SyntaxError, "keyword can't be an expression (%r)" % (n,) return 1, ((token.STRING,`n[1]`,n[2]), nodelist[3]) # listmaker: test ( list_for | (',' test)* [','] ) def listmaker(builder, nodelist): values = [] for i in range(1, len(nodelist)): #if nodelist[i][0] == symbol.list_for: # assert len(nodelist[i:]) == 1 # return com_list_comprehension(builder,values[0],nodelist[i]) if nodelist[i][0] == token.COMMA: continue values.append(nodelist[i]) return builder.List(values) # subscript: '.' '.' '.' | test | [test] ':' [test] [sliceop] # sliceop: ':' [test] def subscript(builder, nodelist): if nodelist[1][0]==token.DOT: return builder.Const(Ellipsis) item = nodelist; nl = len(nodelist) while type(item[1]) is tuple: item=item[1] # find token, to get a line# lineno = item[-1] have_stride = nodelist[-1][0]==symbol.sliceop if have_stride: start = stop = stride = (token.STRING, 'None', lineno) if len(nodelist[-1])==3: stride = nodelist[-1][2] else: start = (token.NUMBER,`0`,lineno) stop = (token.NUMBER,`sys.maxint`,lineno) if nl==5: start,stop = nodelist[1],nodelist[3] # test : test sliceop elif nl==4: if nodelist[1][0]==token.COLON: # : test sliceop stop = nodelist[2] elif have_stride: # test : sliceop start = nodelist[1] else: start, stop = nodelist[1], nodelist[3] # test : test elif nl==3: if nodelist[1][0]==token.COLON: if not have_stride: stop = nodelist[2] # : test else: start = nodelist[1] # test : else: raise AssertionError("Unrecognized subscript", nodelist) if have_stride: return builder.Sliceobj(start,stop,stride) return builder.Slice(start,stop) for sym,name in sym_name.items(): if name in globals(): production[sym] = globals()[name] def build(builder, nodelist): while len(nodelist)==2: nodelist = nodelist[1] return production[nodelist[0]](builder,nodelist) def parse_expr(expr,builder): # include line numbers in parse data so valid symbols are never of length 2 return build(builder, parser.expr(expr).totuple(1)[1]) PKź6|##dispatch/__init__.py"""Multiple/Predicate Dispatch Framework This framework refines the algorithms of Chambers and Chen in their 1999 paper, "Efficient Multiple and Predicate Dispatching", to make them suitable for Python, while adding a few other enhancements like incremental index building and lazy expansion of the dispatch DAG. Also, their algorithm was designed only for class selection and true/false tests, while this framework can be used with any kind of test, such as numeric ranges, or custom tests such as categorization/hierarchy membership. NOTE: this package is not yet ready for prime-time. APIs are subject to change randomly without notice. You have been warned! TODO * Support DAG-walking for visualization, debugging, and ambiguity detection """ from dispatch.interfaces import * from types import ClassType as _ClassType _cls = _ClassType,type def generic(combiner=None): """Use the following function as the skeleton for a generic function Decorate a Python function so that it wraps an instance of 'dispatch.functions.GenericFunction' that has been configured with the decorated function's name, docstring, argument signature, and default arguments. The decorated function will have additional attributes besides those of a normal function. (See 'dispatch.IGenericFunction' for more information on these special attributes/methods.) Most commonly, you will use the 'when()' method of the decorated function to define "rules" or "methods" of the generic function. For example:: import dispatch @dispatch.generic() def someFunction(*args): '''This is a generic function''' @someFunction.when("len(args)>0") def argsPassed(*args): print "Arguments were passed!" @someFunction.when("len(args)==0") def noArgsPassed(*args): print "No arguments were passed!" someFunction() # prints "No args passed" someFunction(1) # prints "args passed" Note that when using older Python versions, you must use '[dispatch.generic()]' instead of '@dispatch.generic()'. """ from dispatch.functions import GenericFunction, AbstractGeneric from peak.util.decorators import decorate_assignment if combiner is None: def callback(frm,name,value,old_locals): return GenericFunction(value).delegate elif isinstance(combiner,_cls) and issubclass(combiner,AbstractGeneric): def callback(frm,name,value,old_locals): return combiner(value).delegate else: def callback(frm,name,value,old_locals): gf = GenericFunction(value) gf.combine = combiner return gf.delegate return decorate_assignment(callback) def as(*decorators): """Use Python 2.4 decorators w/Python 2.2+ Example: import dispatch class Foo(object): [dispatch.as(classmethod)] def something(cls,etc): \"""This is a classmethod\""" """ if len(decorators)>1: decorators = list(decorators) decorators.reverse() def callback(frame,k,v,old_locals): for d in decorators: v = d(v) return v from peak.util.decorators import decorate_assignment return decorate_assignment(callback) def on(argument_name): """Decorate the following function as a single-dispatch generic function Single-dispatch generic functions may have a slight speed advantage over predicate-dispatch generic functions when you only need to dispatch based on a single argument's type or protocol, and do not need arbitrary predicates. Also, single-dispatch functions do not require you to adapt the dispatch argument when dispatching based on protocol or interface, and if the dispatch argument has a '__conform__' method, it will attempt to use it, rather than simply dispatching based on class information the way predicate dispatch functions do. The created generic function will use the documentation from the supplied function as its docstring. And, it will dispatch methods based on the argument named by 'argument_name', and otherwise keeping the same argument signature, defaults, etc. For example:: @dispatch.on('y') def doSomething(x,y,z): '''Doc for 'doSomething()' generic function goes here''' @doSomething.when([SomeClass,OtherClass]) def doSomething(x,y,z): # do something when 'isinstance(y,(SomeClass,OtherClass))' @doSomething.when(IFoo) def doSomething(x,y,z): # do something to a 'y' that has been adapted to 'IFoo' """ def callback(frm,name,value,old_locals): return _mkGeneric(value,argument_name) from dispatch.functions import _mkGeneric from peak.util.decorators import decorate_assignment return decorate_assignment(callback) PKź6j-_V??dispatch/combiners.py"""Method combining subclasses of Dispatcher and GenericFunction""" from strategy import ordered_signatures from interfaces import AmbiguousMethod from functions import Dispatcher class MapDispatcher(Dispatcher): """Abstract base class for method combiners that merge metadata To use this class, subclass it and override the 'getItems()' method (and optionally the 'shouldStop()' method to support your particular kind of metadata. Then use the subclass as a method combiner for a dispatcher or generic function. See 'combiners.txt' for sample code. """ def getItems(self,signature,method): """Return an iterable of '(key,value)' pairs for given rule""" raise NotImplementedError def shouldStop(self,signature,method): """Return truth if combining should stop at this precedence level""" def combine(self,items): """Build a dictionary from a sequence of '(signature,method)' pairs""" d = {} should_stop = False for level in ordered_signatures(items): current = {} for item in level: should_stop = should_stop or self.shouldStop(*item) for k,v in self.getItems(*item): if k in d: # already defined continue if k in current and current[k]<>v: return AmbiguousMethod(k,v,current[k]) current[k] = v d.update(current) if should_stop: break return d PKź6{hdispatch/strategy.py"""Indexing and Method Combination Strategies ProtocolCriterion -- Index handler for checking that an expression adapts to a protocol ClassCriterion -- Index handler for checking that an expression is of a type or class SubclassCriterion -- Index handler for checking that an expression is a subclass of a given class Inequality -- Index handler for checking that an expression has a range relation (i.e. <,>,<=,>=,==,!=) to a constant value IdentityCriterion -- Index handler for checking that an expression 'is' a particular object Min, Max -- Extreme values for use with 'Inequality' Pointer -- hashable/comparable object pointer, with weakref support, used to implement 'IdentityCriterion' Predicate, Signature, PositionalSignature, Argument -- primitives to implement indexable multiple dispatch predicates most_specific_signatures, ordered_signatures, method_chain, method_list, all_methods, safe_methods, separate_qualifiers -- utility functions for creating method combinations validateCriterion -- check if a criterion's implementation is sane """ from __future__ import generators from protocols import Protocol, Adapter, StickyAdapter from protocols.advice import getMRO import protocols, operator, inspect from types import ClassType, InstanceType ClassTypes = (ClassType, type) from sys import _getframe, maxint from weakref import WeakKeyDictionary, ref from dispatch.interfaces import * from new import instancemethod import dispatch __all__ = [ 'ProtocolCriterion', 'ClassCriterion', 'SubclassCriterion', 'Inequality', 'Min', 'Max', 'Predicate', 'Signature', 'PositionalSignature', 'Argument', 'most_specific_signatures', 'ordered_signatures', 'separate_qualifiers', 'method_chain', 'method_list', 'all_methods', 'safe_methods', 'Pointer', 'default', 'IdentityCriterion', 'NullCriterion', 'validateCriterion', 'DispatchNode', 'SeededIndex', ] rev_ops = { '>': '<=', '>=': '<', '=>': '<', '<': '>=', '<=': '>', '=<': '>', '<>': '==', '!=': '==', '==':'!=' } try: set except NameError: from sets import Set as set from sets import ImmutableSet as frozenset class SeededIndex(object): """Index connecting seeds and results""" __slots__ = ( 'dispatch_function', 'allSeeds', 'matchingSeeds', 'criteria', 'enumerables','impliedSeeds' ) def __init__(self,dispatch_function): self.dispatch_function = dispatch_function self.clear() def clear(self): """Reset index to empty""" self.allSeeds = {} # set of all seeds self.matchingSeeds = {} # case -> applicable seeds self.criteria = {} # criterion -> applicable seeds self.enumerables = {} # enumerable -> applicable seeds (reg'd) self.impliedSeeds = {} # enumerable -> applicable seeds (all) def __setitem__(self,criterion,case): """Register 'case' under each of the criterion's seeds""" if criterion.enumerable: enumerables = self.enumerables if criterion not in enumerables: seed = criterion.leaf_seed seeds = self.allSeeds new_seed = seed not in seeds impliedSeeds = self.impliedSeeds for cri in criterion.parent_criteria(): if new_seed or cri not in enumerables: impliedSeeds.setdefault(cri,[]).append(seed) enumerables[criterion] = impliedSeeds[criterion] add = self.addSeed for key in criterion.seeds(): if key not in seeds: add(key,False) self.matchingSeeds[case] = enumerables[criterion] return criteria = self.criteria if criterion not in criteria: seeds = self.allSeeds add = self.addSeed for key in criterion.seeds(): if key not in seeds: add(key) criteria[criterion] = list(criterion.matches(seeds)) self.matchingSeeds[case] = criteria[criterion] def count_for(self,cases): """Get the total count of outgoing branches, given incoming cases""" get = self.matchingSeeds.get dflt = self.allSeeds return len(self.allSeeds), sum([len(get(case,dflt)) for case in cases]) def casemap_for(self,cases): """Return a mapping from seeds->caselists for the given cases""" get = self.matchingSeeds.get dflt = self.allSeeds casemap = dict([(key,[]) for key in dflt]) for case in cases: for key in get(case,dflt): casemap[key].append(case) return casemap def addSeed(self,seed, reseed=True): """Add a previously-missing seed""" if seed in self.allSeeds: return # avoid duping entries if this is a reseed via dispatcher for criterion,itsSeeds in self.criteria.items(): if seed in criterion: itsSeeds.append(seed) if reseed: for criterion,itsSeeds in self.enumerables.items(): if seed in criterion: itsSeeds.append(seed) self.allSeeds[seed] = None def mkNode(self,*args): node = DispatchNode(*args) return instancemethod(self.dispatch_function,node,DispatchNode) class DispatchNode(dict): """A mapping w/lazy population and supporting 'reseed()' operations""" protocols.advise(instancesProvide=[IDispatchTable]) __slots__ = 'index','cases','build','lock' def __init__(self, index, cases, build, lock): self.index = index self.cases = cases self.build = build self.lock = lock dict.__init__( self, [(key,build(subcases)) for key,subcases in index.casemap_for(cases).items()] ) def reseed(self,key): self.lock.acquire() try: self.index.addSeed(key) self[key] = retval = self.build( self.index.casemap_for(self.cases)[key] ) return retval finally: self.lock.release() _memo = {} def make_node_type(dispatch_function): if dispatch_function not in _memo: class Node(DispatchNode): def make_index(cls): return SeededIndex(dispatch_function) make_index = classmethod(make_index) # XXX can't use disp.as! _memo[dispatch_function] = Node return _memo[dispatch_function] def validateCriterion(criterion, node_type, seeded=True, parents=None): """Does 'criterion' have a sane implementation?""" criterion = ICriterion(criterion) hash(criterion) assert criterion.node_type is node_type assert criterion==criterion, (criterion, "should equal itself") assert criterion!=NullCriterion,(criterion,"shouldn't equal NullCriterion") assert criterion!=~criterion,(criterion,"shouldn't equal its inverse") assert criterion==~~criterion,(criterion,"should equal its double-inverse") assert criterion.implies(NullCriterion), ( criterion,"should imply NullCriterion" ) assert criterion.implies(criterion), (criterion,"should imply itself") assert not (~criterion).implies(criterion), ( criterion,"should not be implied by its inverse" ) assert not criterion.implies(~criterion), ( criterion,"should not imply its inverse", ~criterion ) d = {} criterion.subscribe(d) criterion.unsubscribe(d) if seeded: criterion = ISeededCriterion(criterion) for seed in criterion.seeds(): d[seed] = seed in criterion matches = list(criterion.matches(d)) for seed in matches: assert d[seed], (criterion,"should have contained",seed) del d[seed] for value in d.values(): assert not value,(criterion,"should've included",seed,"in matches") _parents = set(criterion.parent_criteria()) if parents is not None: # check specific parent assertion assert _parents == set(parents), (_parents,set(parents)) elif criterion.enumerable: assert _parents, ( criterion,"is enumerable and so should have parents" ) assert criterion in _parents, ( criterion,"does not include itself in its parent criteria" ) assert _parents == set([criterion]), ( criterion,"has too many parents, or you should be specifying" " the `parents` argument to ``validateCriterion()``", _parents ) else: assert not _parents, ( criterion, "is not enumerable and so should not have parents", _parents ) for parent in _parents: assert criterion.leaf_seed in parent assert criterion.implies(parent) class TGraph: """Simple transitive dependency graph""" def __init__(self): self.data = {} def add(self,s,e): self.data.setdefault(s,{}) for old_s,old_es in self.data.items(): if s in old_es or s is old_s: g = self.data.setdefault(old_s,{}) g[e] = 1 for ee in self.data.get(e,()): g[ee] = 1 def items(self): """List of current edges""" return [(s,e) for s in self.data for e in self.data[s]] def successors(self,items): """Return a truth map of the acyclic sucessors of 'items'""" d = {} get = self.data.get for s in items: for e in get(s,()): if s not in get(e,()): d[e] = 1 return d def dispatch_by_mro(table,ob): """Lookup '__class__' of 'ob' in 'table' using its MRO order""" try: klass = ob.__class__ except AttributeError: klass = type(ob) while True: if klass in table: return table[klass] try: klass, = klass.__bases__ except ValueError: if klass.__bases__: # Fixup for multiple inheritance return table.reseed(klass) else: break if isinstance(ob,InstanceType) and InstanceType in table: return table[InstanceType] if klass is not object and object in table: return table[object] def dispatch_by_subclass(table,ob): if isinstance(ob,ClassTypes): while 1: if ob in table: return table[ob] try: ob, = ob.__bases__ except ValueError: if ob.__bases__: return table.reseed(ob) break return table[None] class AbstractCriterion(object): """Common behaviors for typical criteria""" class __metaclass__(type): def __init__(cls,name,bases,cdict): if 'dispatch_function' in cdict: cls.node_type = make_node_type(cls.dispatch_function) __slots__ = 'hash','subject' enumerable = True protocols.advise(instancesProvide=[ISeededCriterion]) def __init__(self,subject=None): self.subject = subject self.hash = hash(subject) def __eq__(self,other): return type(self) is type(other) and self.subject==other.subject def __ne__(self,other): return not self==other def matches(self,table): for key in table: if key in self: yield key def __invert__(self): from predicates import NotCriterion return NotCriterion(self) def subscribe(self,listener): pass def unsubscribe(self,listener): pass def __contains__(self,key): raise NotImplementedError def __and__(self,other): from predicates import AndCriterion return AndCriterion(self,other) def implies(self,other): other = ISeededCriterion(other) if self.enumerable: return self.leaf_seed in other for seed in self.seeds(): if seed in self and seed not in other: return False for seed in other.seeds(): if seed in self and seed not in other: return False return True def parent_criteria(self): if self.enumerable: yield self # Alias subject -> leaf_seed by default AbstractCriterion.leaf_seed = AbstractCriterion.subject # Hack to make hashing faster, by bypassing function call and attr lookup AbstractCriterion.__hash__ = instancemethod( AbstractCriterion.hash.__get__, None, AbstractCriterion ) try: from _d_speedups import dispatch_by_mro except ImportError: pass class ClassCriterion(AbstractCriterion): """Criterion that indicates expr is of a particular class""" __slots__ = () protocols.advise( instancesProvide=[ISeededCriterion], asAdapterForTypes=ClassTypes ) dispatch_function = staticmethod(dispatch_by_mro) def __init__(self,cls): AbstractCriterion.__init__(self,cls) def seeds(self): return [self.subject,object] def __contains__(self,ob): if isinstance(ob,ClassTypes): return ( self.subject is object or issubclass(ob,self.subject) or (self.subject is InstanceType and isinstance(ob,ClassType)) ) return False def __repr__(self): return self.subject.__name__ def parent_criteria(self): for cls in getMRO(self.subject,True): yield self.__class__(cls) _guard = object() class Pointer(int): __slots__ = 'ref' def __new__(cls,ob): self = Pointer.__base__.__new__(cls,id(ob)&maxint) try: self.ref=ref(ob) except TypeError: self.ref=lambda ob=ob: ob return self def __eq__(self,other): return self is other or int(self)==other and self.ref() is not None def __repr__(self): if self.ref() is None: return "Pointer()" % hex(self) else: return "Pointer(%r)" % self.ref() def dispatch_by_identity(table,ob): oid = id(ob)&maxint if oid in table: return table[oid] return table[None] class IdentityCriterion(AbstractCriterion): """Criterion that is true when target object is the same""" protocols.advise( instancesProvide=[ISeededCriterion],asAdapterForTypes=[Pointer] ) __slots__ = () dispatch_function = staticmethod(dispatch_by_identity) def __init__(self,ptr): AbstractCriterion.__init__(self,ptr) def seeds(self): return None,self.subject def matches(self,table): return self.subject, def __contains__(self,ob): return ob==self.subject def __repr__(self): return `self.subject` class NullCriterion(AbstractCriterion): """A "wildcard" Criterion that is always true""" node_type = None def seeds(self): return () def __contains__(self,ob): return True def implies(self,other): return False def __repr__(self): return "NullCriterion" def matches(self,table): return list(table) NullCriterion = NullCriterion() class SubclassCriterion(ClassCriterion): """Criterion that indicates expr is a subclass of a particular class""" __slots__ = () dispatch_function = staticmethod(dispatch_by_subclass) def seeds(self): return [self.subject,None] def __repr__(self): return "SubclassCriterion(%s)" % (self.subject.__name__,) class _ExtremeType(object): # Courtesy of PEP 326 def __init__(self, cmpr, rep): object.__init__(self) self._cmpr = cmpr self._rep = rep def __cmp__(self, other): if isinstance(other, self.__class__) and\ other._cmpr == self._cmpr: return 0 return self._cmpr def __repr__(self): return self._rep def __lt__(self,other): return self.__cmp__(other)<0 def __le__(self,other): return self.__cmp__(other)<=0 def __gt__(self,other): return self.__cmp__(other)>0 def __eq__(self,other): return self.__cmp__(other)==0 def __ge__(self,other): return self.__cmp__(other)>=0 def __ne__(self,other): return self.__cmp__(other)<>0 Max = _ExtremeType(1, "Max") Min = _ExtremeType(-1, "Min") def dispatch_by_inequalities(table,ob): key = ob,ob try: return table[key] except KeyError: if None not in table: table[None] = ranges = concatenate_ranges(table) else: ranges = table[None] lo = 0; hi = len(ranges) while loth: lo = mid+1 else: return table[ranges[mid]] def concatenate_ranges(range_map): ranges = range_map.keys(); ranges.sort() output = [] last = Min for (l,h) in ranges: if l applicable seeds self.last_cases = None self.last_out = None def __setitem__(self,criterion,case): """Register 'case' under each of the criterion's seeds""" self.criteria[case] = criterion for (lo,hi) in criterion.ranges: self.allSeeds[lo] = self.allSeeds[hi] = None def addSeed(self,seed): raise NotImplementedError def casemap_for(self,cases): """Return a mapping from seeds->caselists for the given cases""" if cases is self.last_cases or cases==self.last_cases: return self.last_out tmp = {} out = {} get = self.criteria.get all = Inequality('..',[(Min,Max)]) have_ineq = False for case in cases: for (lo,hi) in get(case,all).ranges: if lo not in tmp: tmp[lo] = [],[],[] if lo==hi: tmp[lo][2].append(case) else: have_ineq = True if hi not in tmp: tmp[hi] = [],[],[] tmp[lo][0].append(case) if hi is not Max: tmp[hi][1].append(case) if have_ineq: keys = tmp.keys() keys.sort() current = frozenset(tmp.get(Min,[[]])[0]) hi = Min for val in keys: add,remove,eq = tmp[val] lo,hi = hi,val out[lo,hi] = current current = current.difference(remove) out[val,val] = current.union(eq) current = current.union(add) else: out[Min,Max] = [] # default for val,(add,remove,eq) in tmp.items(): out[val,val] = eq self.last_out = out self.last_cases = cases return out class Inequality(object): """Criterion that indicates target matches specified const. inequalities""" __slots__ = 'hash','ranges' protocols.advise(instancesProvide=[ICriterion]) class node_type(DispatchNode): make_index = InequalityIndex def __init__(self,op,val): ranges = [] if op=='..': ranges.extend(val) else: if op=='!=': op = '<>' # easier to process this way if '<' in op: ranges.append((Min,val)) if '=' in op: ranges.append((val,val)) if '>' in op: ranges.append((val,Max)) if not ranges or [c for c in op if c not in '<=>']: raise ValueError("Invalid inequality operator", op) self.ranges = ranges = tuple(ranges) self.hash = hash(ranges) def implies(self,other): for r in self.ranges: if not r in other: return False return True def __repr__(self): return 'Inequality("..",%r)' % (self.ranges,) def __contains__(self,ob): for r in self.ranges: if ob==r: return True elif ob[0]==ob[1]: # single point must be *inside* the range if ob[0]>r[0] and ob[1]=r[0] and ob[1]<=r[1]: # for range, overlap allowed return True return False def __and__(self, other): ranges = [] for r in self.ranges: if r in other: ranges.append(r) else: l1, h1 = r for l2,h2 in other.ranges: if l2>h1 or h2Min and (last,lo) not in self: ranges[last,lo] = 1 last = hi if lo<>hi and lo<>Min and (lo,lo) not in self: ranges[lo,lo] = 1 if last is not Max: if (last,last) not in self: ranges[last,last]=1 ranges[last,Max]=1 ranges = list(ranges); ranges.sort() return self.__class__('..',ranges) # Hack to make hashing faster, by bypassing function call and attr lookup Inequality.__hash__ = instancemethod( Inequality.hash.__get__, None, Inequality ) class _Notifier(Protocol): """Helper class that forwards class registration info""" def __init__(self,baseProto): Protocol.__init__(self) from weakref import WeakKeyDictionary self.__subscribers = WeakKeyDictionary() baseProto.addImpliedProtocol(self, protocols.NO_ADAPTER_NEEDED, 1) def subscribe(self,listener): self._Protocol__lock.acquire() try: self.__subscribers[listener] = 1 finally: self._Protocol__lock.release() def unsubscribe(self,listener): self._Protocol__lock.acquire() try: if listener in self.__subscribers: del self.__subscriber[listener] finally: self._Protocol__lock.release() def registerImplementation(self,klass,adapter=protocols.NO_ADAPTER_NEEDED,depth=1): old_reg = Protocol.registerImplementation.__get__(self,self.__class__) result = old_reg(klass,adapter,depth) self._Protocol__lock.acquire() try: if self.__subscribers: for subscriber in self.__subscribers.keys(): subscriber.criterionChanged() finally: self._Protocol__lock.release() return result class ProtocolCriterion(StickyAdapter,AbstractCriterion): """Criterion that indicates instances of expr's class provide a protocol""" protocols.advise( instancesProvide=[ISeededCriterion], asAdapterForTypes=[Protocol] ) attachForProtocols = (ICriterion,ISeededCriterion) dispatch_function = staticmethod(dispatch_by_mro) enumerable = False # Just to be safe def __init__(self,ob): self.notifier = _Notifier(ob) StickyAdapter.__init__(self,ob) AbstractCriterion.__init__(self,ob) def subscribe(self,listener): self.notifier.subscribe(listener) def unsubscribe(self,listener): self.notifier.unsubscribe(listener) def seeds(self): return self.notifier._Protocol__adapters.keys() + [object] def __contains__(self,ob): if isinstance(ob,ClassTypes): bases = self.subject._Protocol__adapters for base in getMRO(ob,True): if base in bases: return bases[base][0] is not protocols.DOES_NOT_SUPPORT return False def implies(self,other): other = ISeededCriterion(other) if other is NullCriterion: return True for base in self.notifier._Protocol__adapters.keys(): if base not in other: return False return True def __repr__(self): return self.subject.__name__ def most_specific_signatures(cases): """List the most specific '(signature,method)' pairs from 'cases' 'cases' is a list of '(signature,method)' pairs, where each 'signature' must provide 'ISignature'. This routine checks the implication relationships between pairs of signatures, and then returns a shorter list of '(signature,method)' pairs such that no other signature from the original list implies a signature in the new list. """ if len(cases)==1: # Shortcut for common case return list(cases) best, rest = list(cases[:1]), list(cases[1:]) for new_sig,new_meth in rest: for old_sig, old_meth in best[:]: # copy so we can modify inplace new_implies_old = new_sig.implies(old_sig) old_implies_new = old_sig.implies(new_sig) if new_implies_old: if not old_implies_new: # better, remove the old one best.remove((old_sig, old_meth)) elif old_implies_new: # worse, skip adding the new one break else: # new_sig has passed the gauntlet, as it has not been implied # by any of the current "best" items best.append((new_sig,new_meth)) return best def ordered_signatures(cases): """Return list of lists of cases sorted into partial implication order Each list within the returned list contains cases whose signatures are overlapping, equivalent, or disjoint with one another, but are more specific than any other case in the lists that follow.""" rest = list(cases) while rest: best = most_specific_signatures(rest) map(rest.remove,best) yield best def all_methods(grouped_cases): """Yield all methods in 'grouped_cases'""" for group in grouped_cases: for signature,method in group: yield method def safe_methods(grouped_cases): """Yield non-ambiguous methods (plus optional raiser of AmbiguousMethod)""" for group in grouped_cases: if len(group)>1: yield AmbiguousMethod(group) break for signature,method in group: yield method def method_list(methods): """Return callable that yields results of calling 'methods' w/same args""" methods = list(methods) # ensure it's re-iterable def combined(*args,**kw): for m in methods: yield m(*args,**kw) return combined def method_chain(methods): """Chain 'methods' such that each may call the next""" methods = iter(methods) # ensure that nested calls will see only the tail for method in methods: try: args = inspect.getargspec(method)[0] except TypeError: return method # not a function, therefore not chainable if args and args[0]=='next_method': if getattr(method,'im_self',None) is None: next_method = method_chain(methods) return instancemethod(method,next_method,type(next_method)) return method return NoApplicableMethods() def single_best(cases): for method in safe_methods(ordered_signatures(cases)): return method else: return NoApplicableMethods() def separate_qualifiers(qualified_cases, **postprocessors): """list[qualified_case] -> dict[qualifier:list[unqualified_case]] Turn a list of cases with possibly-qualified methods into a dictionary mapping qualifiers to (possibly post-processed) case lists. If a given method is not qualified, it's treated as though it had the qualifier '"primary"'. Keyword arguments supplied to this function are treated as a mapping from qualifiers to lists of functions that should be applied to the list of cases to that qualifier. So, for example, this:: cases = separate_qualifiers(cases, primary=[strategy.ordered_signatures,strategy.safe_methods], ) is equivalent to:: cases = separate_qualifiers(cases) if "primary" in cases: cases["primary"]=safe_methods(ordered_signatures(cases["primary"])) Notice, by the way, that the postprocessing functions must be listed in order of *application* (i.e. outermost last). """ cases = {} for signature,method in qualified_cases: if isinstance(method,tuple): qualifier,method = method else: qualifier="primary" cases.setdefault(qualifier,[]).append((signature,method)) for k,v in cases.items(): if k in postprocessors: for p in postprocessors[k]: v = p(v) cases[k] = v return cases class ExprBase(object): protocols.advise(instancesProvide=[IDispatchableExpression]) def __ne__(self,other): return not self.__eq__(other) def __hash__(self): return self.hash def asFuncAndIds(self,generic): raise NotImplementedError class Argument(ExprBase): """The most basic kind of dispatch expression: an argument specifier""" def __init__(self,pos=None,name=None): if pos is None and name is None: raise ValueError("Argument name or position must be specified") self.pos = pos self.name = name self.hash = hash((type(self),self.pos,self.name)) def __eq__(self,other): return isinstance(other,Argument) and \ (other.pos==self.pos) and \ (other.name==self.name) def asFuncAndIds(self,generic): raise NameError("%r is not known to %s" % (self,generic)) def __repr__(self): if self.name: return self.name return 'Argument(%r)' % self.pos class Predicate(object): """A set of alternative signatures in disjunctive normal form""" protocols.advise( instancesProvide=[IDispatchPredicate], asAdapterForProtocols = [protocols.sequenceOf(ISignature)], ) def __init__(self,items): self.items = all = [] for item in map(ISignature,items): if item not in all: all.append(item) def __iter__(self): return iter(self.items) def __and__(self,other): return Predicate( [(a & b) for a in self for b in IDispatchPredicate(other)]) def __or__(self,other): sig = ISignature(other,None) if sig is not None: if len(self.items)==1: return self.items[0] | sig return Predicate(self.items+[sig]) return Predicate(list(self)+list(other)) def __eq__(self,other): return self is other or self.items==list(other) def __ne__(self,other): return not self.__eq__(other) def __repr__(self): return `self.items` protocols.declareAdapter( lambda ob: Predicate([ob]), [IDispatchPredicate], forProtocols=[ISignature] ) class Signature(object): """A set of criteria (in conjunctive normal form) applied to expressions""" protocols.advise(instancesProvide=[ISignature]) __slots__ = 'data','keys' def __init__(self, __id_to_test=(), **kw): items = list(__id_to_test)+[(Argument(name=k),v) for k,v in kw.items()] self.data = data = {}; self.keys = keys = [] for k,v in items: v = ICriterion(v) k = k,v.node_type if k in data: data[k] &= v else: data[k] = v; keys.append(k) def implies(self,otherSig): otherSig = ISignature(otherSig) for expr_id,otherCriterion in otherSig.items(): if not self.get(expr_id).implies(otherCriterion): return False return True def items(self): return [(k,self.data[k]) for k in self.keys] def get(self,expr_id): return self.data.get(expr_id,NullCriterion) def __repr__(self): return 'Signature(%s)' % (','.join( [('%r=%r' % (k,v)) for k,v in self.data.items()] ),) def __and__(self,other): me = self.data.items() if not me: return other if IDispatchPredicate(other) is other: return Predicate([self]) & other they = ISignature(other).items() if not they: return self return Signature( [(k[0],self.data[k]) for k in self.keys] + [(k,v) for (k,d),v in they] ) def __or__(self,other): me = self.data.items() if not me: return self # Always true if IDispatchPredicate(other) is other: return Predicate([self]) | other they = ISignature(other).items() if not they: return other # Always true return Predicate([self,other]) def __eq__(self,other): if other is self: return True other = ISignature(other,None) if other is None or other is NullCriterion: return False for k,v in self.items(): if v!=other.get(k): return False for k,v in other.items(): if v!=self.get(k): return False return True def __ne__(self,other): return not self.__eq__(other) class PositionalSignature(Signature): protocols.advise( instancesProvide=[ISignature], asAdapterForProtocols=[protocols.sequenceOf(ICriterion)] ) __slots__ = () def __init__(self,criteria,proto=None): Signature.__init__(self, zip(map(Argument,range(len(criteria))), criteria) ) default = Signature() PKź6[Ÿdispatch/interfaces.pyfrom protocols import Interface, Attribute __all__ = [ 'IDispatchFunction', 'ICriterion', 'ISignature', 'IDispatchPredicate', 'IDispatcher', 'AmbiguousMethod', 'NoApplicableMethods', 'IDispatchableExpression', 'IGenericFunction', 'IDispatchTable', 'EXPR_GETTER_ID','IExtensibleFunction', 'DispatchError', 'ISeededCriterion', ] class DispatchError(Exception): """A dispatch error has occurred""" def __call__(self,*args,**kw): raise self.__class__(*self.args+(args,kw)) def __repr__(self): # This method is needed so doctests for 2.3/2.4 match 2.5 return self.__class__.__name__+repr(self.args) class AmbiguousMethod(DispatchError): """More than one choice of method is possible""" class NoApplicableMethods(DispatchError): """No applicable method has been defined for the given arguments""" EXPR_GETTER_ID = -1 class ICriterion(Interface): """A criterion to be applied to an expression A criterion comprises a "node type" (that determines how the criterion will be checked, such as an 'isinstance()' check or range comparison) and a value or values that the expression must match. Note that a criterion describes only the check(s) to be performed, not the expression to be checked.""" node_type = Attribute( """The type of object that will actually do the dispatching""" ) def __and__(other): """Apply multiple criteria of the same node type to the same expr""" def __hash__(): """Equal criteria should have equal hashes""" def __eq__(other): """Return true if equal""" def __ne__(other): """Return false if equal""" def __invert__(): """Return an inverse version of this criterion (i.e. '~criterion')""" def implies(other): """Return true if truth of this criterion implies truth of 'other'""" def subscribe(listener): """Call 'listener.criterionChanged()' if applicability changes Multiple calls with the same listener should be treated as a no-op.""" def unsubscribe(listener): """Stop calling 'listener.criterionChanged()' Unsubscribing a listener that was not subscribed should be a no-op.""" class ISeededCriterion(ICriterion): """A criterion that works with a SeededIndex""" enumerable = Attribute( """Can criterion enumerate its implications via ``parent_seeds()``?""" ) leaf_seed = Attribute( """If ``enumerable``, this must be the most-specific parent seed""" ) def seeds(): """Return iterable of known-good keys The keys returned will be used to build outgoing edges in generic functions' dispatch tables, which will be passed to the 'dispatch_function' for interpretation.""" def __contains__(key): """Return true if criterion is true for 'key' This method will be passed each seed provided by this or any other criteria with the same 'dispatch_function' that are being applied to the same expression.""" def matches(table): """Return iterable of keys from 'table' that this criterion matches""" def parent_criteria(): """Iterable of all the criteria implied by this criterions""" class IDispatchFunction(Interface): """Determine what path to take at a dispatch node, given an expression""" def __call__(table,ob): """Return entry from 'table' that matches 'ob' ('None' if not found) 'table' is an 'IDispatchTable' mapping criterion seeds to dispatch nodes. The dispatch function should return the appropriate entry from the dictionary.""" def __eq__(other): """Return true if equal""" def __ne__(other): """Return false if equal""" def __hash__(): """Return hashcode""" class IDispatchTable(Interface): """A dispatch node for dispatch functions to search""" def __contains__(key): """True if 'key' is in table""" def __getitem__(key): """Return dispatch node for 'key', or raise 'KeyError'""" def reseed(key): """Add 'key' to dispatch table and return the node it should have""" class ISignature(Interface): """Ordered mapping from expression id -> criterion that should be applied Note that signatures do not/should not interpret expression IDs; the IDs may be any object that can be used as a dictionary key. """ def items(): """Sequence of '((id,disp_func),criterion)' pairs for this signature""" def get(expr_id): """Return this signature's 'ICriterion' for 'expr_id'""" def implies(otherSig): """Return true if this signature implies 'otherSig'""" def __eq__(other): """Return true if equal""" def __ne__(other): """Return false if equal""" class IDispatchPredicate(Interface): """Sequence of "or"-ed signatures""" def __iter__(): """Iterate over "or"-ed signatures""" def __eq__(other): """Return true if equal""" def __ne__(other): """Return false if equal""" class IDispatchableExpression(Interface): """Expression definition suitable for dispatching""" def asFuncAndIds(generic): """Return '(func,idtuple)' pair for expression computation""" def __eq__(other): """Return true if equal""" def __ne__(other): """Return false if equal""" def __hash__(): """Return hashcode""" class IDispatcher(Interface): """Multi-dispatch mapping object""" def __getitem__(argtuple): """Return the rule body (or combo thereof) that matches 'argtuple'""" def __setitem__(signature,body): """Store 'body' as the rule body for arg tuples matching 'signature'""" def parse(expr_string, local_dict, global_dict): """Parse 'expr_string' --> ISignature or IDispatchPredicate""" def getExpressionId(expr): """Return an expression ID for use in 'asFuncAndIds()' 'idtuple' Note that the constant 'EXPR_GETTER_ID' may be used in place of calling This method, if you want the ID corresponding to a function that will return the value of any other expression whose ID is passed to it.""" def criterionChanged(): """Notify that a criterion has changed meaning, invalidating indexes""" def clear(): """Empty all signatures, methods, criteria, expressions, etc.""" class IExtensibleFunction(Interface): def __call__(*__args,**__kw): """Invoke the function and return results""" def addMethod(predicate,method): """Call 'method' when input matches 'predicate' (Note that single and multiple-dispatch functions use different predicate types: type/class/sequence vs. 'IDispatchPredicate'). """ def when(cond): """Add following function to this GF, w/'cond' as a guard This is used to add a method to a generic function. E.g.:: import foo @foo.barFunc.when(XYZ) def whatever(x,y,z): # code for situation XYZ After the execution of this alternate form, 'whatever' will be bound to the 'whatever' function as shown, but it will also have been added to 'foo.barFunc' under condition 'XYZ'. """ class IGenericFunction(IExtensibleFunction, IDispatcher): """Extensible function that stores methods in an IDispatcher""" # copy() ? PKź6* XLXLdispatch/predicates.pyfrom __future__ import generators from dispatch import * from dispatch.strategy import Inequality, Signature, ExprBase, default from dispatch.strategy import SubclassCriterion, NullCriterion from dispatch.strategy import AbstractCriterion, Pointer, Predicate from dispatch.ast_builder import build import protocols, operator, dispatch from types import NoneType __all__ = [ 'Call', 'AndCriterion', 'NotCriterion', 'TruthCriterion', 'ExprBuilder', 'Const', 'Getattr', 'Tuple', 'dispatch_by_truth', 'OrExpr', 'AndExpr', 'CriteriaBuilder', 'expressionSignature', 'IfElse', ] # Helper functions for operations not supplied by the 'operator' module def is_(o1,o2): return o1 is o2 def in_(o1,o2): return o1 in o2 def is_not(o1,o2): return o1 is not o2 def not_in(o1,o2): return o1 not in o2 def add_dict(d1,d2): d1 = d1.copy() d1.update(d2) return d1 try: frozenset except NameError: from sets import ImmutableSet as frozenset # XXX var, let, ??? class ExprBuilder: simplify_comparisons = True def __init__(self,arguments,*namespaces): self.arguments = arguments self.namespaces = namespaces def Name(self,name): if name in self.arguments: return self.arguments[name] for ns in self.namespaces: if name in ns: return Const(ns[name]) raise NameError(name) def Const(self,value): return Const(value) _cmp_ops = { '>': operator.gt, '>=': operator.ge, '<': operator.lt, '<=': operator.le, '<>': operator.ne, '!=': operator.ne, '==':operator.eq, 'in': in_, 'not in': not_in, 'is': is_, 'is not': is_not } def Compare(self,initExpr,((op,other),)): return Call( self._cmp_ops[op], build(self,initExpr), build(self,other) ) def IfElse(self, tval, cond, fval): return IfElse(build(self,tval), build(self,cond), build(self,fval)) def mkBinOp(op): def method(self,left,right): return Call(op, build(self,left), build(self,right)) return method LeftShift = mkBinOp(operator.lshift) Power = mkBinOp(pow) RightShift = mkBinOp(operator.rshift) Add = mkBinOp(operator.add) Sub = mkBinOp(operator.sub) Mul = mkBinOp(operator.mul) Div = mkBinOp(operator.div) Mod = mkBinOp(operator.mod) FloorDiv = mkBinOp(operator.floordiv) def multiOp(op): def method(self,items): result = build(self,items[0]) for item in items[1:]: result = Call(op, result, build(self,item)) return result return method Bitor = multiOp(operator.or_) Bitxor = multiOp(operator.xor) Bitand = multiOp(operator.and_) def unaryOp(op): def method(self,expr): return Call(op, build(self,expr)) return method UnaryPlus = unaryOp(operator.pos) UnaryMinus = unaryOp(operator.neg) Invert = unaryOp(operator.invert) Backquote = unaryOp(repr) Not = unaryOp(operator.not_) def tupleOp(op): def method(self,items): return Tuple(op,*[build(self,item) for item in items]) return method Tuple = tupleOp(tuple) List = tupleOp(list) def Dict(self, items): keys = Tuple(tuple, *[build(self,k) for k,v in items]) vals = Tuple(tuple, *[build(self,v) for k,v in items]) return Call(dict, Call(zip, keys, vals)) def Subscript(self,left,right): left, right = build(self,left), build(self,right) if isinstance(right,tuple): return Call(operator.getslice,left,*right) else: return Call(operator.getitem,left,right) def Slice(self,start,stop): return build(self,start), build(self,stop) def Sliceobj(self,*args): return Call(slice,*[build(self,arg) for arg in args]) def Getattr(self,expr,attr): expr = build(self,expr) if isinstance(expr,Const): return Const(getattr(expr.value,attr)) return Getattr(expr,attr) def And(self,items): return AndExpr(*[build(self,expr) for expr in items]) def Or(self,items): return OrExpr(*[build(self,expr) for expr in items]) def CallFunc(self,funcExpr,args,kw,star,dstar): func = build(self,funcExpr) if isinstance(func,Const) and not kw and not star and not dstar: return Call(func.value, *[build(self,arg) for arg in args]) elif kw or dstar or args or star: if args: args = Tuple(tuple,*[build(self,arg) for arg in args]) if star: args = Call( operator.add, args, Call(tuple,build(self,star)) ) elif star: args = build(self,star) if kw or dstar: args = args or Const(()) if kw: kw = self.Dict(kw) if dstar: kw = Call(add_dict, kw, build(self,dstar)) elif dstar: kw = build(self,dstar) return Call(apply, func, args, kw) else: return Call(apply, func, args) else: return Call(apply,func) class LogicalExpr(ExprBase): def __new__(klass,*argexprs): for arg in argexprs: if not isinstance(arg,Const): return ExprBase.__new__(klass,*argexprs) return Const(klass.immediate([arg.value for arg in argexprs])) def __init__(self,*argexprs): self.argexprs = argexprs self.hash = hash((type(self),argexprs)) def __eq__(self,other): return type(self) is type(other) and other.argexprs == self.argexprs class OrExpr(LogicalExpr): """Lazily compute logical 'or' of exprs""" def asFuncAndIds(self,generic): argIds = map(generic.getExpressionId,self.argexprs) def or_(get): for arg in argIds: val = get(arg) if val: break return val return or_, (EXPR_GETTER_ID,) [as(classmethod)] def immediate(klass,seq): for item in seq: if item: break return item class AndExpr(LogicalExpr): """Lazily compute logical 'and' of exprs""" def asFuncAndIds(self,generic): argIds = map(generic.getExpressionId,self.argexprs) def and_(get): for arg in argIds: val = get(arg) if not val: break return val return and_, (EXPR_GETTER_ID,) [as(classmethod)] def immediate(klass,seq): for item in seq: if not item: break return item class IfElse(LogicalExpr): """Python 2.5 conditional expression""" def asFuncAndIds(self,generic): argIds = map(generic.getExpressionId, self.argexprs) def ifelse(get): if get(argIds[1]): return get(argIds[0]) return get(argIds[2]) return ifelse, (EXPR_GETTER_ID,) [as(classmethod)] def immediate(klass,seq): if seq[1]: return seq[0] return seq[2] class Tuple(ExprBase): """Compute an expression by calling a function with an argument tuple""" def __new__(klass,function=tuple,*argexprs): for arg in argexprs: if not isinstance(arg,Const): return ExprBase.__new__(klass,function,*argexprs) return Const(function([arg.value for arg in argexprs])) def __init__(self,function=tuple,*argexprs): self.function = function self.argexprs = argexprs self.hash = hash((type(self),function,argexprs)) def __eq__(self,other): return isinstance(other,Tuple) and \ (other.function==self.function) and \ (other.argexprs==self.argexprs) def asFuncAndIds(self,generic): return lambda *args: self.function(args), tuple( map(generic.getExpressionId, self.argexprs) ) def __repr__(self): return 'Tuple%r' % (((self.function,)+self.argexprs),) class Getattr(ExprBase): """Compute an expression by calling a function with 0 or more arguments""" def __init__(self,ob_expr,attr_name): self.ob_expr = ob_expr self.attr_name = attr_name self.hash = hash((type(self),ob_expr,attr_name)) def __eq__(self,other): return isinstance(other,Getattr) and \ (other.ob_expr==self.ob_expr) and \ (other.attr_name==self.attr_name) def asFuncAndIds(self,generic): return eval("lambda ob: ob.%s" % self.attr_name), ( generic.getExpressionId(self.ob_expr), ) class Const(ExprBase): """Compute a 'constant' value""" def __init__(self,value): self.value = value try: self.hash = hash((type(self),value)) except TypeError: self.hash = hash((type(self),id(value))) def __eq__(self,other): return isinstance(other,Const) and (other.value==self.value) def asFuncAndIds(self,generic): return lambda:self.value,() def __repr__(self): return 'Const(%r)' % (self.value,) class Call(ExprBase): """Compute an expression by calling a function with 0 or more arguments""" def __new__(klass,function,*argexprs): if not argexprs: return ExprBase.__new__(klass,function) for arg in argexprs: if not isinstance(arg,Const): return ExprBase.__new__(klass,function,*argexprs) return Const(function(*[arg.value for arg in argexprs])) def __init__(self,function,*argexprs): self.function = function self.argexprs = argexprs self.hash = hash((type(self),function,argexprs)) def __eq__(self,other): return isinstance(other,Call) and \ (other.function==self.function) and \ (other.argexprs==self.argexprs) def asFuncAndIds(self,generic): return self.function,tuple(map(generic.getExpressionId, self.argexprs)) def __repr__(self): return 'Call%r' % (((self.function,)+self.argexprs),) class MultiCriterion(AbstractCriterion): """Abstract base for boolean combinations of criteria""" __slots__ = 'node_type' criteria = AbstractCriterion.subject # alias criteria <-> subject enumerable = False def __new__(klass,*criteria): criteria, all = map(ISeededCriterion,criteria), [] nt = criteria[0].node_type for c in criteria: if c.node_type is not nt: raise ValueError("Mismatched dispatch types", criteria) if c.__class__ is klass: # flatten nested criteria all.extend([c for c in c.criteria if c not in all]) elif c not in all: all.append(c) if len(all)==1: return all[0] self = object.__new__(klass) self.node_type = nt AbstractCriterion.__init__(self,frozenset(all)) return self def seeds(self): seeds = {} for criterion in self.criteria: for seed in criterion.seeds(): seeds[seed]=True return seeds.keys() def subscribe(self,listener): for criterion in self.criteria: criterion.subscribe(listener) def unsubscribe(listener): for criterion in self.criteria: criterion.unsubscribe(listener) def __repr__(self): return '%s%r' % (self.__class__.__name__,tuple(self.criteria)) def __invert__(self): raise NotImplementedError def __init__(self,*criteria): pass class AndCriterion(MultiCriterion): """All criteria must return true for expression""" __slots__ = () def __contains__(self,key): for criterion in self.criteria: if key not in criterion: return False return True class NotCriterion(MultiCriterion): __slots__ = () __new__ = AbstractCriterion.__new__ def __init__(self, criterion): criterion = ISeededCriterion(criterion) AbstractCriterion.__init__(self,(criterion,)) self.node_type = criterion.node_type def __invert__(self): return self.criteria[0] def __contains__(self,key): return key not in self.criteria[0] def dispatch_by_truth(table,ob): return table.get(bool(ob)) class TruthCriterion(AbstractCriterion): """Criterion representing truth or falsity of an expression""" __slots__ = () truth = AbstractCriterion.subject dispatch_function = staticmethod(dispatch_by_truth) def __init__(self,truth=True): AbstractCriterion.__init__(self,bool(truth)) def seeds(self): return True,False def __contains__(self,key): return key==self.truth def __invert__(self): return TruthCriterion(not self.truth) def matches(self,table): return self.truth, def __repr__(self): return "TruthCriterion(%r)" % self.truth [dispatch.generic()] def expressionSignature(expr,criterion): """Return an ISignature that applies 'criterion' to 'expr'""" [expressionSignature.when(default)] def expressionSignature(expr,criterion): return Signature([(expr,criterion)]) class CriteriaBuilder: bind_globals = True simplify_comparisons = True mode = TruthCriterion(True) def __init__(self,arguments,*namespaces): self.expr_builder = ExprBuilder(arguments,*namespaces) def mkOp(name): op = getattr(ExprBuilder,name) def method(self,*args): return expressionSignature(op(self.expr_builder,*args), self.mode) return method for opname in dir(ExprBuilder): if opname[0].isalpha() and opname[0]==opname[0].upper(): locals()[opname] = mkOp(opname) def Not(self,expr): try: self.__class__ = NotBuilder return build(self,expr) finally: self.__class__ = CriteriaBuilder _mirror_ops = { '>': '<', '>=': '<=', '=>':'<=', '<': '>', '<=': '>=', '=<':'>=', '<>': '<>', '!=': '<>', '==':'==', 'is': 'is', 'is not': 'is not' } _rev_ops = { '>': '<=', '>=': '<', '=>': '<', '<': '>=', '<=': '>', '=<': '>', '<>': '==', '!=': '==', '==':'!=', 'in': 'not in', 'not in': 'in', 'is': 'is not', 'is not': 'is' } def Compare(self,initExpr,((op,other),)): left = build(self.expr_builder,initExpr) right = build(self.expr_builder,other) if isinstance(left,Const) and op in self._mirror_ops: left,right,op = right,left,self._mirror_ops[op] if isinstance(right,Const): if not self.mode.truth: op = self._rev_ops[op] if op=='in' or op=='not in': cond = compileIn(left,right.value,op=='in') if cond is not None: return cond else: if op=='is' or op=='is not': if right.value is None: right = ICriterion(NoneType) else: right = ICriterion(Pointer(right.value)) if op=='is not': right = ~right else: right = Inequality(op,right.value) return Signature([(left, right)]) # Both sides involve variables or an un-optimizable constant, # so it's a generic boolean criterion :( return expressionSignature( self.expr_builder.Compare(initExpr,((op,other),)), self.mode ) def And(self,items): return reduce(operator.and_,[build(self,expr) for expr in items]) def Or(self,items): return reduce(operator.or_,[build(self,expr) for expr in items]) def compileIn(expr,criterion,truth): """Return a signature or predicate (or None) for 'expr in criterion'""" try: iter(criterion) except TypeError: return applyCriterion(expr,criterion,truth) if truth: return or_criteria(expr,[Inequality('==',v) for v in criterion]) else: return and_criteria(expr,[Inequality('<>',v) for v in criterion]) [dispatch.on('criterion')] def applyCriterion(expr,criterion,truth): """Apply 'criterion' to 'expr' (ala 'expr in criterion') -> sig or pred""" [applyCriterion.when(ICriterion)] def applyICriterion(expr,criterion,truth): if not truth: criterion = ~criterion return Signature([(expr,criterion)]) [applyCriterion.when(object)] def applyDefault(expr,criterion,truth): return None # no special application possible def or_criteria(expr,seq): seq = [Signature([(expr,v)]) for v in seq] if len(seq)==1: return seq[0] return Predicate(seq) def and_criteria(expr,seq): it = iter(seq) for val in it: break else: raise ValueError("No criteria supplied!") for next in it: val &= next return Signature([(expr,val)]) class NotBuilder(CriteriaBuilder): mode = TruthCriterion(False) def Not(self,expr): try: self.__class__ = CriteriaBuilder return build(self,expr) finally: self.__class__ = NotBuilder # Negative logic for and/or And = CriteriaBuilder.Or Or = CriteriaBuilder.And def _yield_tuples(ob): if isinstance(ob,tuple): for i1 in ob: for i2 in _yield_tuples(i1): yield i2 else: yield ob [expressionSignature.when( # matches 'isinstance(expr,Const)' "expr in Call and expr.function is isinstance" " and len(expr.argexprs)==2 and expr.argexprs[1] in Const" )] def convertIsInstanceToClassCriterion(expr,criterion): seq = _yield_tuples(expr.argexprs[1].value) expr = expr.argexprs[0] if not criterion.truth: return and_criteria(expr,[~ICriterion(cls) for cls in seq]) return or_criteria(expr,map(ICriterion,seq)) [expressionSignature.when( # matches 'issubclass(expr,Const)' "expr in Call and expr.function is issubclass" " and len(expr.argexprs)==2 and expr.argexprs[1] in Const" )] def convertIsSubclassToSubClassCriterion(expr,criterion): seq = _yield_tuples(expr.argexprs[1].value) expr = expr.argexprs[0] if not criterion.truth: return and_criteria(expr,[~SubclassCriterion(cls) for cls in seq]) return or_criteria(expr,map(SubclassCriterion,seq)) PKź6]Mzkzkdispatch/combiners.txt================== Result Combination ================== Sometimes, more than one method of a generic function (or ``Dispatcher`` entry) applies in a given circumstance. For example, you might need to sum the results of a series of pricing rules in order to compute a product's price. Or, sometimes you'd like a method to be able to modify the result of a less-specific method. For these scenarios, you will want to use one of several "result combination" techniques, ranging from using a provided subclass of ``GenericFunction`` or ``Dispatcher``, to rolling your own entirely custom combination approach. >>> import dispatch >>> from dispatch import strategy, functions, combiners, NoApplicableMethods .. contents:: **Table of Contents** -------------------------- The "Standard" Combination -------------------------- The default ``Dispatcher`` class only supports returning the most-specific value, or raising ``NoApplicableMethods`` or ``AmbiguousMethod`` errors. But, the default ``GenericFunction`` class implements a result combination strategy similar to the "standard method combination" for generic functions in the Common Lisp Object System (CLOS). Specifically, it supports methods calling the "next method", and it supports "before", "after", and "around" methods, in an ordering similar to that of CLOS. Let's go over each of these concepts in turn. But as we do, please keep in mind that because of PyProtocols' "logical implication" approach to method ordering, before/after/around methods should be needed less often than they would be in CLOS. So, make sure you actually need a particular feature before making your code more complicated than it needs to be. Using ``next_method`` ===================== By default, a ``GenericFunction`` will only invoke the most-specific applicable method. However, if you add a ``next_method`` argument to the beginning of an individual method's signature, you can use it to call the "next method" that applies. That is, the second-most-specific method. If that method also has a ``next_method`` argument, it too will be able to invoke the next method after it, and so on, down through all the applicable methods. For example:: >>> class NextMethodExample: ... [dispatch.generic()] ... def foo(self,bar,baz): ... """Foo bar and baz""" ... ... [foo.when("bar>1 and baz=='spam'")] ... def foo_one_spam(next_method, self, bar, baz): ... return bar + next_method(self,bar,baz) ... ... [foo.when("baz=='spam'")] ... def foo_spam(self, bar, baz): ... return 42 ... ... [foo.when("baz=='blue'")] ... def foo_spam(next_method, self, bar, baz): ... ... # if next_method is an instance of DispatchError, it means ... # that calling it will raise that error (NoApplicableMethods ... # or AmbiguousMethod) ... assert isinstance(next_method, dispatch.DispatchError) ... ... # but we'll call it anyway, just to demo the error ... return 22 + next_method(self,bar,baz) >>> NextMethodExample().foo(2,"spam") # 2 + 42 44 >>> NextMethodExample().foo(2,"blue") # 22 + ...no next method! Traceback (most recent call last): File ... combiners.txt... in foo_spam return 22 + next_method(self,bar,baz) ... NoApplicableMethods: ... Notice that ``next_method`` comes *before* ``self`` in the arguments if the generic function is an instance method. (If used, it must be the *very first* argument of the method.) Its value is supplied automatically by the generic function machinery, so when you call ``next_method`` you do not have to care whether the next method needs to know *its* next method; just pass in all of the *other* arguments (including ``self`` if applicable) and the ``next_method`` implementation will do the rest. (For implementation details, see the ``strategy.method_chain()`` function, which is described in the `Method Combination Utilities`_ section below.) Also notice that methods that do not call their next method do not need to have a ``next_method`` argument. If a method calls ``next_method`` when there are no further methods available, ``NoApplicableMethods`` is raised. Similarly, if there is more than one "next method" and they are all equally specific (i.e. ambiguous), then ``AmbiguousMethod`` is raised. Most of the time, you will know when writing a routine whether it's safe to call ``next_method``. But sometimes you need a routine to behave differently depending on whether a next method is available. If calling ``next_method`` will raise an error, then ``next_method`` will be an instance of the error class, so you can detect it with ``isinstance()``. If there are no remaining methods, then ``next_method`` will be an instance of ``NoApplicableMethods``, and if the next method is ambiguous, it will be an ``AmbiguousMethod`` instance. In either case, calling ``next_method`` will raise that error with the supplied arguments. Before/After Methods ==================== Sometimes you'd like for some additional validation or notification to occur before or after the "normal" or "primary" methods. This is what "before", "after", and "around" methods are for. For example:: >>> class BankAccount: ... ... def __init__(self,balance,protection=0): ... self.balance = balance ... self.protection = protection ... ... [dispatch.generic()] ... def withdraw(self,amount): ... """Withdraw 'amount' from bank""" ... ... [withdraw.when(strategy.default)] # nominal case ... def withdraw(self,amount): ... self.balance -= amount ... ... [withdraw.before("amount>self.balance and self.protection==0")] ... def prevent_overdraft(self,amount): ... raise ValueError("Insufficient funds") ... ... [withdraw.after("amount>self.balance")] ... def automatic_overdraft(self,amount): ... print "Transferring",-self.balance,"from overdraft protection" ... self.protection += self.balance ... self.balance = 0 >>> acct = BankAccount(200) >>> acct.withdraw(400) Traceback (most recent call last): ... ValueError: Insufficient funds >>> acct.protection = 300 >>> acct.withdraw(400) Transferring 200 from overdraft protection >>> acct.balance 0 >>> acct.protection 100 This specific example could have been written entirely with normal ``when()`` methods, by using more complex conditions. But, in more complex scenarios, where different modules may be adding rules to the same generic function, it's not possible for one module to predict whether its conditions will be more specific than another's, and whether it will need to call ``next_method``, etc. So, generic functions offer ``before()`` and ``after()`` methods, that run before and after the ``when()`` (aka "primary") methods, respectively. Unlike primary methods, ``before()`` and ``after()`` methods: * Are allowed to have ambiguous conditions (and if they do, they execute in the order in which they were added to the generic function) * Are *always* run when their conditions apply, with no need to call ``next_method`` to invoke the next method * Cannot return a useful value and do not have access to the return value of any other method The overall order of method execution is: 1. All applicable ``before()`` methods, from most-specific to least-specific, methods at the same level of specificity execute in the order they were added. 2. Most-specifc primary method, which may optionally chain to less-specific primary methods. ``AmbiguousMethod`` or ``NoApplicableMethods`` may be raised if the most-specific method is ambiguous or no primary methods are applicable. 3. All applicable ``after()`` methods, from *least-specific* to most-specific, with methods at the same level of specificity executing in the reverse order from the order they were added. (In other words, the more specific the ``after()`` condition, the "more after" it gets run!) If any of these methods raises an uncaught exception, the overall function execution terminates at that point, and methods later in the order are not run. "Around" Methods ================ Sometimes you need to recognize certain special cases, and perhaps not run the entire generic function, or need to alter its return value in some way, or perhaps trap and handle certain exceptions, etc. You can do this with "around" methods, which run "around" the entire "before/primary/after" sequence described in the previous section. A good way to think of this is that it's as if the "around" methods form a separate generic function, whose default (least-specific) method is the original, "inner" generic function. When "around" methods are applicable on a given invocation of the generic function, the most-specific "around" method is invoked. It may then choose to call its ``next_method`` to invoke the next-most-specific "around" method, and so on. When there are no more "around" methods, calling ``next_method`` instead invokes the "before", "primary", and "after" methods, according to the sequence described in the previous section. For example:: >>> if [BankAccount.withdraw.around("amount > self.balance")]: # Python 2.3 ... def overdraft_fee(next_method,self,amount): ... print "Adding overdraft fee of $25" ... return next_method(self,amount+25) >>> acct.withdraw(20) Adding overdraft fee of $25 Transferring 45 from overdraft protection (Note: the ``if`` block should be replaced by a decorator in normal code; it needs to be as shown for ``doctest`` to properly parse the above test in Python versions < 2.4.) ------------------------- Custom Result Combination ------------------------- If none of the supplied ``Dispatcher`` or ``GenericFunction`` subclasses directly meet your needs, you'll want to implement a custom subclass that overides the ``combine()`` method to implement your custom algorithm. The ``combine()`` method takes one argument: a sequence of ``(signature,res)`` tuples (also known as "cases"), where ``res`` is either a dispatcher result or a generic function method, and ``signature`` is an ``ISignature`` describing the condition under which that result or method should apply. The ``combine()`` method must then return a single callable (for generic functions) or a single result (for dispatcher classes). It may raise ``AmbiguousMethod`` or ``NoApplicableMethods`` to indicate an error condition. Initially, the input sequence will be in definition order. That is, each case (``(signature,res)`` pair) will appear in the order it was added to the dispatcher or generic function. It is up to the ``combine()`` method to do any re-ordering or sorting desired. For your convenience, the ``dispatch.strategy`` module includes several useful functions for sorting, filtering, and combining methods from the input sequence. Method Combination Utilities ============================ The following method combination utilities are available from the ``dispatch.strategy`` module. They can be assembled in various ways to create interesting method combinations: ``single_best(cases)`` Return the single "best" method or value from `cases`. That is, the method or value of the case whose signature is most specific. If it's ambiguous as to which is most specific, an ``AmbiguousMethod`` instance is returned. If ``cases`` is empty, then a ``NoApplicableMethods`` instance is returned:: >>> strategy.single_best([]) NoApplicableMethods() >>> strategy.single_best([(strategy.Signature(x=int),1)]) 1 >>> strategy.single_best( ... [(strategy.Signature(x=object),1),(strategy.Signature(x=int),2)] ... ) 2 >>> strategy.single_best( ... [(strategy.Signature(x=int),1),(strategy.Signature(x=int),2)] ... ) AmbiguousMethod([(Signature((x...int), 1), (Signature((x...int), 2)],) This function implements the default ``Dispatcher`` combination strategy, or the generic function strategy in the absence of ``next_method`` and before/after/around methods. ``ordered_signatures(cases)`` Yields a series of cases grouped by specificity, such that each group is a set of equally-specific cases, but which are more specific than the cases in groups that follow. The grouped cases can then be passed to ``safe_methods()`` or ``all_methods()`` in order to extract methods for combining. Note that groups containing more than one case are *ambiguous*. That is, it is not statically determinable which cases are more specific than the others. In general, a dispatcher should raise ``AmbiguousMethod`` if the first group yielded by this function has a length greater than 1. ``safe_methods(grouped_cases)`` Yields methods from the grouped cases until an ambiguous group is found or the input is exhausted. An ambiguous group in the input will be replaced by a callable in the output that raises ``AmbiguousMethod`` when called. >>> list(strategy.safe_methods([])) [] >>> list(strategy.safe_methods([[(1,2)],[(3,4)],[(5,6)]])) [2, 4, 6] >>> list(strategy.safe_methods([[(1,2)],[(3,4),(5,6)]])) [2, AmbiguousMethod([(3, 4), (5, 6)],)] ``all_methods(grouped_cases)`` Yields all methods from the grouped cases, including ones in ambiguous groups. >>> list(strategy.all_methods([])) [] >>> list(strategy.all_methods([[(1,2)],[(3,4),(5,6)]])) [2, 4, 6] ``method_chain(methods)`` Returns a callable that invokes the first method in ``methods``. If that method has a ``next_method`` parameter, then when called it will be passed an extra argument, pointing to the next applicable method in ``methods``, and so on recursively, until a method without a ``next_method`` parameter is reached. (Thus, if the first method in ``methods`` does not have a ``next_method`` parameter, it is returned directly.) If there are no methods in ``methods``, then a dummy method is returned that raises ``NoApplicableMethods`` when called. >>> def f1(next_method): print "f1"; return next_method() >>> def f2(next_method): print "f2"; return next_method() >>> def f3(): print "f3"; return "done" >>> strategy.method_chain([f1,f2,f3])() f1 f2 f3 'done' >>> strategy.method_chain([])() Traceback (most recent call last): ... NoApplicableMethods... >>> mc1 = strategy.method_chain([f2,f3]) >>> mc1() f2 f3 'done' >>> mc2 = strategy.method_chain([f1,mc1]) >>> mc2() f1 f2 f3 'done' ``method_list(methods)`` Returns a callable that when called, yields the results of calling each of the supplied methods in turn with the same arguments: >>> def f1(x): return "f1"+x >>> def f2(x): return "f2"+x >>> for item in strategy.method_list([f1,f2])("y"): print item f1y f2y >>> list(strategy.method_list([])()) # empty method list yields no results [] Custom Method Qualfiers ======================= If the standard before/after/around/when decorators don't work for your application, you can create custom ones by subclassing ``AbstractGeneric`` and defining your own "method qualifiers". Here's an example of a "pricing rules" generic function that accomodates tax and discounts as well as upcharges. (Don't worry if you don't understand it at first glance; we'll go over the individual parts in detail later.):: >>> class Pricing(functions.AbstractGeneric): ... """Implement a generic pricing rule with add-ons, tax, etc.""" ... ... def add_when(self,cond): self._decorate(cond,"add") ... def tax_when(self,cond): self._decorate(cond,"tax") ... def discount_when(self,cond): self._decorate(cond,"discount") ... ... def combine(self,cases): ... cases = strategy.separate_qualifiers( ... cases, add=[ ... strategy.ordered_signatures, strategy.all_methods, ... strategy.method_list ... ], ... ) ... discount = strategy.single_best(cases.get('discount',())) ... tax = strategy.single_best(cases.get('tax',())) ... ... def combined(*args,**kw): ... price = sum(cases['add'](*args,**kw)) ... if not isinstance(discount,NoApplicableMethods): ... price -= discount(*args,**kw) * price ... if not isinstance(tax,NoApplicableMethods): ... price += tax(*args,**kw) * price ... return price ... ... return combined The ``_decorate`` method of ``AbstractGeneric`` implements a simple function decorator similar to ``when()`` et al., so ``Pricing`` generic functions will have ``add_when()``, ``tax_when()``, and ``discount_when()`` decorator methods. The functions decorated by these methods are then tracked with "qualifiers" indicating what kind of method they are, so that ``combine()`` can then separate them out of the list of applicable methods for a given situation. ``combine()`` then creates a closure (``combined``) that combines the effects of the applicable methods. We can now use this pricing class to implement a generic function:: >>> class Product: ... [dispatch.generic(Pricing)] ... def getPrice(product,customer=None,options=()): ... """Get this product's price""" ... ... [getPrice.add_when(strategy.default)] ... def __addBasePrice(product,customer,options): ... """Always include the product's base price""" ... return product.base_price >>> shoes = Product() >>> shoes.base_price = 42 And then we can create some pricing rules (again, these "if" blocks should be decorators; they have to be this way to support running the doctests in Python 2.3):: >>> if [Product.getPrice.add_when("'blue suede' in options")]: ... def blueSuedeUpcharge(product,customer,options): ... return 24 ... >>> if [Product.getPrice.discount_when( ... "customer=='Elvis' and 'blue suede' in options and product is shoes" ... )]: ... def ElvisGetsTenPercentOff(product,customer,options): ... return .1 And now we can try them out:: >>> shoes.getPrice() 42 >>> shoes.getPrice(options=['blue suede']) 66 >>> print shoes.getPrice('Elvis',options=['blue suede']) 59.4 >>> shoes.getPrice('Elvis') # no suede, no discount! 42 Now, let's look at the function that was used in ``combine()`` to separate and preprocess the applicable methods. ``separate_qualifiers(qualified_cases, **postprocess)`` Turn a list of cases with possibly-qualified methods into a dictionary mapping qualifiers to (possibly post-processed) case lists. If a given method is not qualified, it's treated as though it had the qualifier '"primary"'. Keyword arguments supplied to this function are treated as a mapping from qualifiers to lists of functions that should be applied to the list of cases to that qualifier. So, for example, this:: cases = separate_qualifiers(cases, primary=[strategy.ordered_signatures,strategy.safe_methods], ) is equivalent to:: cases = separate_qualifiers(cases) if "primary" in cases: cases["primary"]=safe_methods(ordered_signatures(cases["primary"])) Notice, by the way, that the postprocessing functions must be listed in order of *application* (i.e. outermost last). Some examples/tests:: >>> def f1(x): pass >>> def f2(x): pass >>> def f3(x): pass >>> mixed = [(1,("x",f1)),(2,("x",f2)),(3,("y",f3))] >>> strategy.separate_qualifiers(mixed) # doctest: +NORMALIZE_WHITESPACE {'y': [(3, )], 'x': [(1, ), (2, )]} >>> flat = [(1,f1),(2,f2),(3,("y",f3))] >>> strategy.separate_qualifiers(flat) # doctest: +NORMALIZE_WHITESPACE {'y': [(3, )], 'primary': [(1, ), (2, )]} >>> strategy.separate_qualifiers(flat,primary=[strategy.method_chain]) {'y': [(3, )], 'primary': (1, )} So, now that you know how ``separate_qualifiers()`` works, you can go back and see what the ``Pricing.combine()`` method is doing, and begin thinking about when you might want to create custom combinations of your own. Map Dispatchers =============== Map dispatchers are ``Dispatcher`` subclasses, typically used for class and attribute metadata such as what command line options are associated with a class' attributes. Map dispatchers merge the metadata that was defined for a class and its ancestors, detecting any ambiguities between specific metadata items defined in different base classes, or defined by multiple rules for the same class. (Actually, they use normal implication precedence, but for simple metadata registries, this usually maps directly to the inheritance structure of the target classes.) In essence, one defines the metadata as a set of keys and values. The map combiner builds a map of the "most specific" applicable values. The keys and values are extracted from each applicable rule or method in the dispatcher or generic function, and then they are merged in precedence order. If there are two rules at the same precedence level, and they share any keys, the values they provide for those keys must be equal, or an ambiguity occurs. (Unless, that is, those keys were already unambiguously defined at a higher precedence level.) To start, we'll define a basic class hierarchy, shaped basically like this:: A / \ B C \ / D By creating these classes, and some signatures to use in their place:: >>> class A: pass >>> class B(A): pass >>> class C(A): pass >>> class D(B,C): pass >>> a = strategy.Signature(x=A) >>> b = strategy.Signature(x=B) >>> c = strategy.Signature(x=C) >>> d = strategy.Signature(x=D) Our example map combiner will use functions as its rules, with function attributes serving as keys and values. We'll define some functions that have the same keys but different values, some with the same keys and same values, and some with different keys. And, we'll also create a rule that means "ignore any lower-precedence rules":: >>> def r1(): pass >>> r1.key_a = 1 >>> def r2(): pass >>> r2.key_a = 2 # same key, different value >>> def r3(): pass >>> r3.key_a = 2 # same key, same value >>> def r4(): pass >>> r4.key_a = 4 # same key, different value >>> r4.key_b = 42 # different key >>> def r5(): pass >>> r5.stop = True # "stop processing rules" Next, we'll need a ``MapDispatcher`` subclass that can interpret this rule schema:: >>> class ExampleDispatcher(combiners.MapDispatcher): ... def getItems(self,signature,rule): ... # get function attributes ... return [kv for kv in rule.__dict__.items() if kv[0]<>'stop'] ... def shouldStop(self,signature,rule): ... return getattr(rule,'stop',False) And we need an instance of it to use as a dispatcher, whose ``combine`` method we'll be testing:: >>> disp = ExampleDispatcher(['x']) >>> combine = disp.combine In the simplest possible case, combining no results should return an empty dictionary:: >>> combine([]) {} And supplying a single result will return a dictionary containing that rule's attributes:: >>> combine([(a,r1)]) {'key_a': 1} >>> combine([(a,r4)]) {'key_a': 4, 'key_b': 42} When supplying more than one result, the one with higher precedence should take effect, regardless of the order in which they are supplied:: >>> combine([(b,r2),(a,r1)]) # most-specific first {'key_a': 2} >>> combine([(a,r1),(b,r2)]) # least-specific first {'key_a': 2} And values for keys on lower-precedence rules should still "show through" if there is no higher-precedence value defined for a given key:: >>> combine([(b,r2),(a,r4)]) {'key_a': 2, 'key_b': 42} But rules at the same precedence levels with the same keys should return an ``AmbiguousMethod`` instance:: >>> combine([(b,r1),(c,r2)]) AmbiguousMethod('key_a', 2, 1) Unless of course the keys have the same values:: >>> combine([(b,r2),(c,r3)]) {'key_a': 2} >>> combine([(b,r2),(c,r3),(a,r4)]) # should still merge other key {'key_a': 2, 'key_b': 42} Or the conflicting key is already given a value with higher precedence:: >>> combine([(b,r1),(c,r2),(d,r4)]) {'key_a': 4, 'key_b': 42} Or if a "stop" is requested at a higher precedence:: >>> combine([(b,r1),(c,r2),(d,r5)]) {} Notice that once a "stop" takes effect, no lower-precedence rules are handled:: >>> combine([(c,r1),(b,r5),(a,r4)]) {'key_a': 1} So, now that we've verified that our ``combine()`` method works, we should be able to use our dispatcher as if it were a normal ``Dispatcher`` instance (but which provides appropriately-combined results):: >>> disp["x in A"] = r4 >>> disp["x in B"] = r1 >>> disp["x in C"] = r2 >>> disp["x in D"] = r3 >>> disp[A(),] {'key_a': 4, 'key_b': 42} >>> disp[B(),] {'key_a': 1, 'key_b': 42} >>> disp[C(),] {'key_a': 2, 'key_b': 42} >>> disp[D(),] {'key_a': 2, 'key_b': 42} Voila! Now we can make direct use of the metadata mapping that's returned by the dispatcher for an instance of a given class. Note, by the way, that the results are cached by the dispatcher, so a given set of methods is only combined once (unless new methods are added or criteria such as protocols are updated):: >>> disp[D(),] is disp[D(),] # same object returned from two calls 1 Finally, let's verify that adding an ambiguous or conflicting definition results in an error at dispatch time:: >>> disp["x in D"] = r1 # we previously defined it as r3 >>> disp[D(),] Traceback (most recent call last): ... AmbiguousMethod: ('key_a', 1, 2, (<...D instance at ...>,), {}) As you can see, ``r1`` and ``r3`` have conflicting definitions for the value of ``'key_a'``, so making them both applicable to ``D`` instances creates an ambiguity. PKs6vdispatch/_d_speedups.pydMZ@ !L!This program cannot be run in DOS mode. $PEL>F# 8b0w R ` .text`b``.dataf@.rdatapl@@.bss.edataRr@@.idata t@.reloc` ~@BUWVS } |$]M\$ $Zƃ 1t[tE 9rtЋ9st&$^1҉$^uuF1t&e[^_] t&$^tlZvZ&v'9rtЋ9st&$16^=$^o*^ 1_'UET$L$$]ɃÍUEL$T$$]ÐU]ZUVS uEEED$ED$D$ <D$ ED$E $EEE$@u+^EF 8u F P$REF 'D$ MS>u F4$PE8u EP$RE8u EP$RډЃ [^]UWVS}E@D$$NÅuDPD$$dƅuD ;u C$P$ÅuDx D$4$XEuDs>u F4$P ;u C$PE$@EtDU