# coding: utf-8 from __future__ import (absolute_import, print_function, division, unicode_literals) import os.path import itertools import re _tolist_split = re.compile(r'/+') def _tolist(path): """ Split a path into a list of components """ path = path.strip('/') if path == "": return [] return _tolist_split.split(path) class Tree(object): """ Generic interface for tree operations. Methods prefixed with 'l' take a list of split path components as a path element, their equivalent without the leading 'l' take a string as a path component. """ def __init__(self, name=None, doc=None): """ Name is the name of this tree. This allows to use the tree as a subtree inside another, and still get meaningful exception messages. """ self._name = name or "/" self._documentation = doc def has(self, path, **kw): """ Return True if the path exists, else False """ return self.lhas(_tolist(path), **kw) def lhas(self, path, **kw): """ Return True if the path exists, else False """ return False def get(self, path, **kw): """ Returns the object at the given path, or None if path does not exist """ return self.lget(_tolist(path), **kw) def lget(self, path, **kw): """ Returns the object at the given path, or None if path does not exist """ return None def set(self, path, value, **kw): """ Update the object at the given path using the ``value`` object. """ return self.lset(_tolist(path), value, **kw) def lset(self, path, value, **kw): """ Update the object at the given path using the ``value`` object. """ raise KeyError("cannot set the value of path %s" % os.path.join(self._name, *path)) def list(self, path, **kw): """ Return a sequence with the children of the object at the given path. The elements of the sequence can be the same objects that would be returned by issuing a ``get`` on each of the children, but can also be lightweight "summary" objects (for example, a string with the name of the child node) in those cases where instantiating an object for every child would be overkill for a list operation. """ return self.llist(_tolist(path), **kw) def llist(self, path, **kw): """ Return a sequence with the children of the object at the given path. The elements of the sequence can be the same objects that would be returned by issuing a ``get`` on each of the children, but can also be lightweight "summary" objects (for example, a string with the name of the child node) in those cases where instantiating an object for every child would be overkill for a list operation. """ raise KeyError("cannot list the children of path %s" % os.path.join(self._name, *path)) def delete(self, path, **kw): """ Delete the object at the given path, and all its subobjects """ return self.ldelete(_tolist(path), **kw) def ldelete(self, path, **kw): """ Delete the object at the given path, and all its subobjects """ raise KeyError("cannot delete path %s" % os.path.join(self._name, *path)) def create(self, path, value=None, **kw): """ Create the object at the given path. If value is given, it is used to initialise the object as if a set had been done with that value. Otherwise, the object is created using default values. """ return self.lcreate(_tolist(path), value) def lcreate(self, path, value=None): """ Create the object at the given path. If value is given, it is used to initialise the object as if a set had been done with that value. Otherwise, the object is created using default values. """ raise KeyError("cannot create path %s" % os.path.join(self._name, *path)) def doc(self, path, **kw): """ Return documentation for the given path. Returns None if no documentation is available """ return self.ldoc(_tolist(path), **kw) def ldoc(self, path, **kw): """ Return documentation for the given path. Returns None if no documentation is available """ return self._documentation class Subtree(Tree): """ A tree that gives proxy access to a subtree of another tree """ def __init__(self, tree, root): root = root.strip('/') super(Subtree, self).__init__(os.path.join(tree._name, root)) self.tree = tree self.root = _tolist(root) def lhas(self, path, **kw): return self.tree.lhas(self.root + path, **kw) def lget(self, path, **kw): return self.tree.lget(self.root + path, **kw) def lset(self, path, value, **kw): self.tree.lset(self.root + path, value, **kw) def llist(self, path, **kw): return self.tree.llist(self.root + path, **kw) def ldelete(self, path, **kw): return self.tree.ldelete(self.root + path, **kw) def lcreate(self, path, value=None, **kw): return self.tree.lcreate(self.root + path, value, **kw) def ldoc(self, path, **kw): return self.tree.ldoc(self.root + path, **kw) class Forest(Tree): """ Maintains a virtual tree of tree items, and dispatches requests to the tree items that are registered """ def __init__(self, name="/", doc=None): """ Name is the name of this forest. This allows to use a forest as a subtree inside another, and still get meaningful exception messages. """ super(Forest, self).__init__(name, doc) self.branches = {} def _get(self, **kw): """ Get the value associated with this tree node """ return None def _set(self, value, **kw): """ Set the value associated with this tree node """ raise KeyError("cannot set the value of path %s" % self._name) def register(self, tree): """ Register the given tree object to answer queries at the given branch """ self.branches[tree._name] = tree def branch(self, branch): return self.branches.get(branch, None) def lhas(self, path, **kw): """ Return True if the path exists, else False """ if len(path) == 0: return True tree = self.branches.get(path[0], None) if tree is None: return False if len(path) == 1: return True return tree.lhas(path[1:], **kw) def lget(self, path, **kw): """ Returns the object at the given path, or None if path does not exist """ if len(path) == 0: return self._get(**kw) tree = self.branches.get(path[0], None) if tree is None: return None return tree.lget(path[1:], **kw) def lset(self, path, value, **kw): """ Update the object at the given path using the ``value`` object. """ if len(path) == 0: return self._set(value, **kw) tree = self.branches.get(path[0], None) if tree is None: raise KeyError("path {} does not exist".format(os.path.join(self._name, path[0]))) return tree.lset(path[1:], value, **kw) def llist(self, path, **kw): """ Return a sequence with the children of the object at the given path. The elements of the sequence can be the same objects that would be returned by issuing a ``get`` on each of the children, but can also be lightweight "summary" objects in those cases where instantiating an object for every child would be overkill for a list operation """ if len(path) == 0: return list(self.branches.keys()) tree = self.branches.get(path[0], None) if tree is None: raise KeyError("path %s does not exist" % os.path.join(self._name, path[0])) return tree.llist(path[1:], **kw) def ldelete(self, path, **kw): """ Delete the object at the given path, and all its subobjects """ if len(path) == 0: raise LookupError("tree %s cannot delete self" % self._name) tree = self.branches.get(path[0], None) if tree is None: raise KeyError("path %s does not exist" % os.path.join(self._name, path[0])) if len(path) == 1: raise KeyError("cannot delete path %s" % os.path.join(self._name, path[0])) tree.ldelete(path[1:], **kw) def lcreate(self, path, value=None, **kw): """ Create the object at the given path. If value is given, it is used to initialise the object as if a set had been done with that value. Otherwise, the object is created using default values. """ if len(path) == 0: raise LookupError("tree %s cannot create self" % self._name) if len(path) == 1: raise KeyError("path %s cannot be created" % os.path.join(self._name, path[0])) tree = self.branches.get(path[0], None) if tree is None: raise KeyError("path %s does not exist" % os.path.join(self._name, path[0])) return tree.lcreate(path[1:], value, **kw) def ldoc(self, path, **kw): if len(path) == 0: return super(Forest, self).ldoc(path, **kw) tree = self.branches.get(path[0], None) if tree is None: return None return tree.ldoc(path[1:], **kw) class Parrot(Tree): """ Tree whose every method raises an exception with the method name and arguments """ def __init__(self, name="/"): super(Parrot, self).__init__(name) def lhas(self, path): raise Exception("%s: has %s" % (self._name, "/".join(path))) def lget(self, path): raise Exception("%s: get %s" % (self._name, "/".join(path))) def lset(self, path, value): raise Exception("%s: set %s \"%s\"" % (self._name, "/".join(path), value)) def llist(self, path): raise Exception("%s: list %s" % (self._name, "/".join(path))) def ldelete(self, path): raise Exception("%s: delete %s" % (self._name, "/".join(path))) def lcreate(self, path, value=None): raise Exception("%s: create %s \"%s\"" % (self._name, "/".join(path), value)) def ldoc(self, path): raise Exception("%s: doc %s" % (self._name, "/".join(path))) class PyTree(Tree): """ Tree node built using a python class hierarchy, and property-aware """ def __init__(self, name="/", doc=None): super(PyTree, self).__init__(name, doc) def _asproperty(self, name): m = getattr(self.__class__, name, None) if m != None and isinstance(m, property): return m else: return None def _astree(self, name): m = getattr(self, name, None) if m != None and isinstance(m, Tree): return m else: return None def _has(self, **kw): """ Report if this node exists. There is probably no reason for this not to be true, but this is still implemented in a hookable way to keep the interface consistent. """ return True def _get(self, **kw): """ Get the value associated with this tree node """ return None def _set(self, value, **kw): """ Set the value associated with this tree node """ raise KeyError("cannot set the value of path %s" % self._name) def _list(self, **kw): """ List the values associated with this tree node """ res = [] for method in dir(self): if self._asproperty(method) or self._astree(method): res.append(method) return res def _delete(self, **kw): """ Delete the object itself. Note: this requires cooperation with the upper tree node, and hence it probably makes no sense to override this method. The default implementation raises an exception, which is probably good for most of the time. """ raise KeyError("cannot delete path %s" % self._name) def _doc(self, **kw): """ Get the documentation for this tree node """ return super(PyTree, self).ldoc([]) def lhas(self, path, **kw): if len(path) == 0: return self._has(**kw) sub = self._astree(path[0]) if sub: return sub.lhas(path[1:], **kw) if len(path) > 1: return False sub = self._asproperty(path[0]) if sub: return True return False def lget(self, path, **kw): if len(path) == 0: return self._get(**kw) sub = self._astree(path[0]) if sub: return sub.lget(path[1:], **kw) if len(path) > 1: return None sub = self._asproperty(path[0]) if sub: return sub.__get__(self) return None def lset(self, path, value, **kw): if len(path) == 0: return self._set(value, **kw) sub = self._astree(path[0]) if sub: return sub.lset(path[1:], value, **kw) if len(path) > 1: return super(PyTree, self).lset(path, value, **kw) sub = self._asproperty(path[0]) if sub: return sub.__set__(self, value) return None def llist(self, path, **kw): if len(path) == 0: return self._list(**kw) sub = self._astree(path[0]) if sub: return sub.llist(path[1:], **kw) if len(path) > 1 or not self._asproperty(path[0]): return super(PyTree, self).llist(path, **kw) return [] def ldelete(self, path, **kw): if len(path) == 0: return self._delete(**kw) sub = self._astree(path[0]) if sub: return sub.ldelete(path[1:], **kw) return super(PyTree, self).ldelete(path, **kw) def lcreate(self, path, value=None, **kw): if len(path) == 0: raise KeyError("cannot create path %s" % self._name) sub = self._astree(path[0]) if sub: return sub.lcreate(path[1:], value, **kw) return super(PyTree, self).lcreate(path, value, **kw) def ldoc(self, path, **kw): if len(path) == 0: return self._doc(**kw) sub = self._astree(path[0]) if sub: return sub.ldoc(path[1:], **kw) if len(path) > 1: return None sub = self._asproperty(path[0]) if sub: return sub.__doc__ return None class ReadonlyDictTree(Forest): """ dict that is also a readonly tree """ def __init__(self, name="/", doc=None, *args, **kw): super(ReadonlyDictTree, self).__init__(name, doc) self._values = dict(*args, **kw) def __hasitem__(self, key): return self._values.__hasitem__(key) def __getitem__(self, key): return self._values.__getitem__(key) def __setitem__(self, key, val): return self._values.__setitem__(key, val) def ldoc(self, path): if len(path) == 0 or path[0] not in self._values: return super(ReadonlyDictTree, self).ldoc(path) if len(path) == 1 and path[0] in self._values: return self._documentation + ": " + path[0] return super(ReadonlyDictTree, self).ldoc(path) def lhas(self, path): if len(path) == 0: return True if len(path) == 1 and path[0] in self._values: return True return super(ReadonlyDictTree, self).lhas(path) def lget(self, path): if len(path) == 0: return dict(self._values) if len(path) == 1 and path[0] in self._values: return self._values[path[0]] return super(ReadonlyDictTree, self).lget(path) def llist(self, path): if len(path) == 0: return sorted(set(Forest.llist(self, [])) | set(self._values.keys())) if len(path) == 1 and path[0] in self._values: return [] return super(ReadonlyDictTree, self).llist(path) class DictTree(ReadonlyDictTree): """ Writable version of ReadonlyDictTree """ def lset(self, path, value): if len(path) == 1 and path[0] in self._values: self._values[path[0]] = value else: return super(ReadonlyDictTree, self).lset(path, value) def ldelete(self, path): if len(path) == 1 and path[0] in self._values: del self._values[path[0]] else: return super(ReadonlyDictTree, self).ldelete(path, value) def lcreate(self, path, value=None): if len(path) != 1 or path[0] in self.branches: return super(ReadonlyDictTree, self).lcreate(path, value) else: self._values[path[0]] = value return value class DynamicView(Tree): """ Tree that is a view over some dynamic database """ def __init__(self, name, doc=None): super(DynamicView, self).__init__(name, doc) self.specials = {} def elements(self): """ Generate the sequence of Trees on this view """ return [] def element(self, name): """ Instantiate one element by name, or return None if the element does not exist """ return None def new_element(self, name, value): """ Create the element 'name', with the given value """ raise KeyError("cannot create element %s in %s" % (name, os.path.join(self._name, name))) def lhas(self, path): if len(path) == 0: return True if path[0] in self.specials: if len(path) == 1: return True return self.specials[path[0]].lhas(path[1:]) else: el = self.element(path[0]) if el is None: return False if len(path) == 1: return True return el.lhas(path[1:]) def lget(self, path): if len(path) == 0: return [e.lget([]) for e in self.elements()] if path[0] in self.specials: return self.specials[path[0]].lget(path[1:]) else: el = self.element(path[0]) if el is None: return None return el.lget(path[1:]) def lset(self, path, value): if len(path) == 0: return super(DynamicView, self).lset(path, value) if path[0] in self.specials: return self.specials[path[0]].lset(path[1:], value) else: el = self.element(path[0]) if el is None: return super(DynamicView, self).lset(path, value) return el.lset(path[1:], value) def llist(self, path): if len(path) == 0: return sorted(self.specials.keys()) + sorted([e._name for e in self.elements()]) if path[0] in self.specials: return self.specials[path[0]].llist(path[1:]) else: el = self.element(path[0]) if el is None: return super(DynamicView, self).llist(path) return el.llist(path[1:]) def lcreate(self, path, value=None): if len(path) == 0: return super(DynamicView, self).lcreate(path, value) elif path[0] in self.specials: return self.specials[path[0]].lcreate(path[1:], value) elif len(path) > 1: el = self.element(path[0]) if el is None: return super(DynamicView, self).lcreate(path, value) return el.lcreate(path[1:], value) else: return self.new_element(path[0], value) def ldelete(self, path): if len(path) == 0: return super(DynamicView, self).ldelete(path) elif path[0] in self.specials: return self.specials[path[0]].ldelete(path[1:]) elif len(path) > 1: el = self.element(path[0]) if el is None: return super(DynamicView, self).ldelete(path) return el.ldelete(path[1:]) else: return self.delete_element(path[0]) def ldoc(self, path): if len(path) == 0: return super(DynamicView, self).ldoc(path) if path[0] in self.specials: return self.specials[path[0]].ldoc(path[1:]) else: el = self.element(path[0]) if el is None: return None return el.ldoc(path[1:]) class MockTree(Tree): """ A tree containing a whole hierarchy, stored in ram. ldoc always returns None As a leaf node: - self.value = scalar - has("") → True - get("") → self.value - set("", scalar:val) → self.value = val - set("", dict:val) → for name, value in val.items(): self.value[name].set("", value) - list("") → [] - create("", val) → error - create("foo", val) → self.value is replaced with a dict { "foo": MockTree("foo", None, val) } - delete("") → error - delete("foo") → error - all other combinations → error As a tree node: - self.value = { name: MockTree(name, None) } - has("") → True - has("foo/bar") → self.value["foo"].has("bar") - get("") → { name, tree.get("") for name, tree in self.value.items } - get("foo/bar") → self.value["foo"].get("bar") - set("", scalar:val) → self.value = val - set("", dict:val) → for name, value in val.items(): self.value[name].set("", value) - set("foo/bar", val) → self.value["foo"].set("bar", val) - list("") → sorted(self.value.keys()) - list("foo/bar") → self.value["foo"].list("bar") - create("", val) → error - create("foo", val) → self.value["foo"] = MockTree("foo", None, val) - delete("") → error - delete("foo") → del self.value["foo"] - delete("foo/bar") → self.value["foo"].delete("bar") - all other combinations → error """ def __init__(self, name=None, doc=None, value=None): super(MockTree, self).__init__(name, doc) # A scalar if this is a leaf node, a { name: MockTree } if this node has children self.value = value def _forward(self, method, path, *args, **kw): subtree = self.value.get(path[0], None) if not self.is_leaf else None if subtree is None: return getattr(super(MockTree, self), method)(path, *args, **kw) else: return getattr(subtree, method)(path[1:], *args, **kw) @property def is_leaf(self): return not isinstance(self.value, dict) def lhas(self, path): if not path: return True return self._forward("lhas", path) def lget(self, path): if path: return self._forward("lget", path) if self.is_leaf: return self.value else: return { name: subtree.lget([]) for name, subtree in self.value.items() } def lset(self, path, value): if path: return self._forward("lset", path, value) if isinstance(value, dict): self.value = {} for name, val in value.items(): subtree = MockTree(name) self.value[name] = subtree subtree.lset([], val) else: self.value = value def llist(self, path): if path: return self._forward("llist", path) if self.is_leaf: return [] else: return sorted(self.value.keys()) def lcreate(self, path, value=None): if not path: return super(MockTree, self).lcreate(path, value) if self.is_leaf: subtree = MockTree(path[0]) self.value = { path[0]: subtree } else: subtree = self.value.get(path[0], None) if subtree is None: subtree = MockTree(path[0]) self.value[path[0]] = subtree if len(path) == 1: subtree.lset([], value) else: subtree.lcreate(path[1:], value) def ldelete(self, path): if not path: return super(MockTree, self).ldelete(path) if self.is_leaf: return super(MockTree, self).ldelete(path) if len(path) == 1: del self.value[path[0]] else: return self._forward("ldelete", path) def ldoc(self, path): return None