nutools/lib/nulib/python/deploydb/objects.py

1235 lines
45 KiB
Python

# -*- coding: utf-8 mode: python -*- vim:sw=4:sts=4:et:ai:si:sta:fenc=utf-8
"""Définition des blocs de base: objets, liens, prédicats
"""
__all__ = (
'XT',
'listP', 'fileP', 'pathP', 'mpathP', 'lowerP',
'catalog',
'ADD_UNIQUE', 'ADD', 'REMOVE', 'RESET_ADD',
'Link',
'Object',
'Fact',
)
import os
from os import path
from glob import glob
from .utils import *
from .expr import Expr
# méthodes de mise à jour des attributs
ADD_UNIQUE = object()
ADD = object()
REMOVE = object()
RESET_ADD = object()
# valeurs marqeurs pour les méthodes qui doivent pouvoir utiliser None comme
# valeur
_RAISE_EXCEPTION = object()
_UNDEF = object()
# fonctions diverses
def isindex(k):
return isnum(k) or isinstance(k, slice)
def XT(dictOrClass, **kw):
"""créer un objet dictionnaire avec la valeur Dict ou Class.ATTRS augmenté des valeurs de **kw
"""
if isinstance(dictOrClass, dict): d = dictOrClass
else: d = dictOrClass.ATTRS
return dict(d, **kw)
# parsers
class ListP(object):
"""parser qui prend une liste de valeurs séparées par des virgules
"""
def parse(self, l):
if l is None: return None
return flattenstr(listof(l))
listP = ListP()
class FileP(object):
"""parser qui prend une liste de fichiers séparées par ':'
les chemins sont tilde-expansés
"""
def parse(self, f):
fs = flattenstr(listof(f), sep=':')
return [path.expanduser(f) for f in fs]
fileP = FileP()
class PathP(object):
"""parser qui prend une liste de chemins séparées par ':'
les chemins sont tilde-expansés et transformés en chemins absolus
"""
def parse(self, p):
ps = flattenstr(listof(p), sep=':')
return [path.abspath(path.expanduser(p)) for p in ps]
pathP = PathP()
class MpathP(object):
"""parser qui prend une liste de chemins séparées par ':'
les chemins sont tilde-expansés et transformés en chemins absolus, mais
uniquement s'ils contiennent un caractère '/'
"""
def parse(self, p):
ps = flattenstr(listof(p), sep=':')
pms = []
for p in ps:
pm = path.expanduser(p)
if '/' in pm: pm = path.abspath(pm)
pms.append(pm)
return pms
mpathP = MpathP()
class LowerP(object):
"""parser qui transforme en minuscule les valeurs
"""
def parse(self, s):
if s is None: return None
return s.lower()
lowerP = LowerP()
################################################################################
# Catalogue des objets
class Catalog(object):
# map otype --> oclass
_t2cmap, t2cmap = None, property(lambda self: self._t2cmap)
# map otype --> id2object
_t2iomap, t2iomap = None, property(lambda self: self._t2iomap)
# liste d'instance de facts
_facts, facts = None, property(lambda self: self._facts)
# types d'objets qui doivent être résolus en premier
_rf_otypes = None
def __init__(self):
self._t2cmap = {}
self._t2iomap = {}
self._facts = []
self._rf_otypes = set()
def register(self, oclass, otype=None, resolve_first=False):
if otype is None:
otype = oclass.__name__.lower()
self._t2cmap[otype] = oclass
self._rf_otypes.add(otype)
############################################################################
# Création
def create_object(self, otype, oid, *values, **attrs):
"""créer un nouvel objet du type spécifié ou le mettre à jour
"""
if isinstance(otype, type):
oclass = otype
otype = otype.TYPE
else:
oclass = self._t2cmap.get(otype, None)
if not self._t2iomap.has_key(otype):
self._t2iomap[otype] = {}
i2omap = self._t2iomap[otype]
if not i2omap.has_key(oid):
if oclass is None:
object = Object(oid, *values, **attrs)
object.otype = otype
else:
object = oclass(oid, *values, **attrs)
i2omap[oid] = object
else:
object = i2omap[oid]
if values: object.update('values', values)
if attrs: object.update(attrs)
return object
def create_fact(self, sotype, soid, verb, totype, toid, **attrs):
"""créer un nouveau fait entre les deux objets spécifiés
"""
if isinstance(sotype, type): sotype = sotype.TYPE
if isinstance(totype, type): totype = totype.TYPE
fact = Fact(sotype, soid, verb, totype, toid, **attrs)
self._facts.append(fact)
return fact
def resolve(self):
"""résoudre tous les objets et tous les faits
"""
rf_otypes = self._rf_otypes
# d'abord résoudre les types d'objets mentionnés dans rf_otypes
for otype in rf_otypes:
i2omap = self.t2iomap.get(otype, None)
if i2omap is None: continue
for id, object in i2omap.items():
object.resolve(self)
# puis résoudre les autres types d'objets
for otype, i2omap in self.t2iomap.items():
if otype in rf_otypes: continue
for id, object in i2omap.items():
object.resolve(self)
# puis résoudre tous les faits
for fact in self.facts:
fact.resolve(self)
return self
############################################################################
# Consultation
def get(self, otype, oid, default=_RAISE_EXCEPTION, create=True, resolve=True):
"""obtenir un objet par son type et son identifiant
par défaut, le créer s'il n'existe pas. avec create=True, l'argument
default est ignoré.
si create=False, default indique la valeur à retourner. lancer une
exception ValueError si default=_RAISE_EXCEPTION (c'est la valeur par
défaut)
"""
object = None
i2omap = self._t2iomap.get(otype, None)
if i2omap is not None: object = i2omap.get(oid, None)
if object is None and not create:
if default is _RAISE_EXCEPTION:
raise ValueError("%s:%s: not found" % (otype, oid))
else:
return default
if object is None:
object = self.create_object(otype, oid)
if resolve:
object.resolve(self)
return object
############################################################################
# Recherches
def find_tobjects(self, totype, objects, create=True, resolve=True):
"""trouver les objets liés de type totype dans la objects
"""
objects = listof(objects)
if totype is not None:
# mettre dans un dictionnaire et indexer sur oid pour éviter les
# doublons
tobjects = {}
for object in objects:
if object.otype == totype:
tobjects[object.oid] = object
else:
lobjects = [link.resolve(self, None, create, resolve) for link in object.get_links(totype)]
for lobject in lobjects:
if lobject is None: continue
tobjects[lobject.oid] = lobject
objects = tobjects.values()
return objects
def filter_objects(self, expr, objects):
"""ne garder dans la liste objects que les objets qui correspondent à
l'expression.
"""
objects = listof(objects)
return [object for object in objects if Expr.match_term(expr, object)]
def find_objects(self, otype=None, oid=None,
totype=None, expr=None,
create=True, resolve=True):
"""chercher les objets correspondant à otype et/ou oid
si totype!=None, alors chercher les objets liés qui sont de ce type
"""
otypes = listof(otype, None)
oids = listof(oid, None)
if otypes is not None and oids is not None:
objects = []
for otype in otypes:
i2omap = self.t2iomap.get(otype, {})
objects.extend([object for object in i2omap.values() if object.oid in oids])
elif otypes is not None and oids is None:
objects = []
for otype in otypes:
i2omap = self.t2iomap.get(otype, {})
objects.extend(i2omap.values())
elif oids is not None and otypes is None:
objects = []
for otype, i2omap in self.t2iomap.items():
objects.extend([object for object in i2omap.values() if object.oid in oids])
else:
objects = []
for otype, i2omap in self.t2iomap.items():
objects.extend(i2omap.values())
if resolve:
map(lambda object: object.resolve(self), objects)
objects = self.find_tobjects(totype, objects, create, resolve)
if expr is not None:
objects = self.filter_objects(expr, objects)
return objects
def filter_facts(self, expr, facts):
"""ne garder dans la liste facts que les faits qui correspondent à l'expression
"""
facts = listof(facts)
return [(fact, tsobjects, ttobjects)
for (fact, tsobjects, ttobjects) in facts
if Expr.match_term(expr, fact)]
def find_facts(self, sotype=None, soid=None, verb=None, totype=None, toid=None,
tsotype=None, tsexpr=None,
ttotype=None, ttexpr=None,
expr=None,
resolve=True):
"""chercher les faits correspondant aux arguments
retourner une liste de tuples (fact, tsobjects, ttobjects) où
* fact est le fait original
* tsobjects sont les objets sources liés si tsotype et tsexpr sont
spécifiés
* ttobjects sont les objets destination liés si ttotype et ttexpr sont
spécifiés
"""
sotypes = listof(sotype, None)
soids = listof(soid, None)
verbs = listof(verb, None)
totypes = listof(totype, None)
toids = listof(toid, None)
facts = []
for fact in self.facts:
if sotypes is not None and fact.sotype not in sotypes:
continue
if soids is not None and fact.soid not in soids:
continue
if verbs is not None and fact.verb not in verbs:
continue
if totypes is not None and fact.totype not in totypes:
continue
if toids is not None and fact.toid not in toids:
continue
tsobjects = [fact.sresolve(self, None, True)]
ttobjects = [fact.tresolve(self, None, True)]
if tsotype is not None:
# chercher les objets liés dans la source
tsobjects = self.filter_objects(tsexpr, self.find_tobjects(tsotype, tsobjects))
if not tsobjects: continue
if ttotype is not None:
# chercher les objets liés dans la source
ttobjects = self.filter_objects(ttexpr, self.find_tobjects(ttotype, ttobjects))
if not ttobjects: continue
facts.append((fact, tsobjects, ttobjects))
if resolve:
for fact, tsobjects, ttobjects in facts:
fact.resolve(self)
if expr is not None:
facts = self.filter_facts(expr, facts)
return facts
############################################################################
# Divers
def dump(self):
self.resolve()
for otype, i2omap in self.t2iomap.items():
print "OBJECTS:%s:" % otype
for id, object in i2omap.items():
object.dump(" ")
if self.facts:
print "FACTS:"
for fact in self.facts:
fact.dump(" ")
################################################################################
# liens
class Link(object):
"""Un lien vers une référence d'un objet
Un lien a le type de l'objet cible (propriété `otype`), son identifiant
(propriété `oid`), et des attributs multivalués (toutes les autres
propriétés)
"""
ATTRS = dict(otype=None, oid=None, attrs=None)
_rw_attrs = set(('otype', 'oid'))
_ro_attrs = set(('attrs',))
_reserved_attrs = _rw_attrs | _ro_attrs
_otype = None
_oid = None
_attrs = None
def __init__(self, otype=None, oid=None, **attrs):
self.__dict__['_otype'] = otype
self.__dict__['_oid'] = oid
self.__dict__['_attrs'] = {}
for attr, value in attrs.items():
self.update(attr, value)
def __parse(self, attr, value):
"""obtenir le parser qui permet de s'assurer que value est dans le bon
format pour l'attribut attr.
"""
if isindex(attr): parser = None
else: parser = self.ATTRS.get(attr, None)
if parser is None: return value
elif isseq(value): return flattenseq(map(parser.parse, value))
else: return parser.parse(value)
# accès aux attributs
def __getattr__(self, attr):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
try:
return self._attrs[attr]
except KeyError:
raise AttributeError(attr)
def __setattr__(self, attr, value):
value = self.__parse(attr, value)
if attr in self._rw_attrs:
return super(Link, self).__setattr__('_%s' % attr, value)
elif attr in self._ro_attrs:
raise AttributeError(attr)
else:
self._attrs[attr] = listof(value)
def __delattr__(self, attr):
if attr in self._reserved_attrs:
raise AttributeError(attr)
try:
del self._attrs[attr]
except KeyError:
raise AttributeError(attr)
def __getitem__(self, attr):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
else:
return self._attrs[attr]
def __setitem__(self, attr, value):
value = self.__parse(attr, value)
if attr in self._rw_attrs:
return super(Link, self).__setattr__('_%s' % attr, value)
elif attr in self._ro_attrs:
raise KeyError(attr)
else:
self._attrs[attr] = listof(value)
def __delitem__(self, attr):
if attr in self._reserved_attrs:
raise KeyError(attr)
else:
del self._attrs[attr]
def first(self, attr, default=None):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
if self._attrs.has_key(attr):
values = self._attrs[attr]
if values: return values[0]
return default
def get(self, attr, default=None):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
else:
return self._attrs.get(attr, default)
def has_key(self, attr):
"""tester l'existence d'un attribut"""
if attr in self._reserved_attrs:
return True
else:
return self._attrs.has_key(attr)
@property
def known_attrs(self):
"""obtenir une liste triée d'attributs faisant partie du schéma"""
return sorted(list(self.ATTRS.keys()))
@property
def misc_attrs(self):
"""obtenir une liste triée d'attributs ne faisant pas partie du schéma"""
schema_attrs = set(self.ATTRS.keys())
defined_attrs = set(self._attrs.keys())
return sorted(list(defined_attrs - schema_attrs))
@property
def missing_attrs(self):
"""obtenir une liste triée d'attributs faisant partie du schéma mais non définis"""
schema_attrs = set(self.ATTRS.keys())
defined_attrs = set(self._attrs.keys())
return sorted(list(schema_attrs - defined_attrs - self._reserved_attrs))
def update(self, attr, value=None, update_type=ADD_UNIQUE):
"""mettre à jour l'attribut spécifié
si l'attribut n'existe pas, il est créé. sinon, la liste des valeurs de
l'attribut est étendue.
si value==None, aucune mise à jour n'est effectuée
si attr est une instance de dictionnaire, mettre à jour *tous* les
attributs spécifiés.
update_type est la méthode de mise à jour
"""
if isinstance(attr, dict):
attrs = attr
for attr, value in attrs.items():
self.update(attr, value, update_type)
return self
if attr in self._reserved_attrs:
raise KeyError(attr)
if value is not None:
values = listof(self.__parse(attr, value))
if not self._attrs.has_key(attr):
self._attrs[attr] = []
attr = self._attrs[attr]
if update_type is ADD_UNIQUE:
for value in values:
if value not in attr:
attr.append(value)
elif update_type is ADD:
attr.extend(values)
elif update_type is REMOVE:
for value in values:
if value in attr:
attr.remove(value)
elif update_type is RESET_ADD:
attr[:] = values
return self
def set_defaults(self, attr, value=None, update_type=ADD_UNIQUE):
"""Mettre à jour l'attribut spécifié s'il n'existe pas
si value==None, aucune mise à jour n'est effectuée
si attr est une instance de dictionnaire, mettre à jour *tous* les
attributs spécifiés s'ils n'existent pas.
"""
if isinstance(attr, dict):
attrs = attr
for attr, value in attrs.items():
self.set_defaults(attr, value, update_type)
return self
if attr in self._reserved_attrs:
raise KeyError(attr)
if not self._attrs.has_key(attr):
self.update(attr, value, update_type)
return self
def clone(self):
"""cloner ce lien"""
return self.__class__(self._otype, self._oid, **self._attrs)
# catalogue
def resolve(self, catalog, default=_RAISE_EXCEPTION, create=True, resolve=True):
"""obtenir l'objet lié
"""
return catalog.get(self.otype, self.oid, default, create, resolve)
# divers
def _dump_idtype(self, indent, prefix=None):
if prefix is None: prefix = ''
else: prefix = "%s " % prefix
print "%s%s%s:%s" % (indent, prefix, self._otype, self._oid)
def _dump_attrs(self, indent):
attrs = self._attrs
missing_attrs = self.missing_attrs
if attrs or missing_attrs:
print "%s attrs:" % indent
for name, values in attrs.items():
if len(values) == 1:
print "%s %s=%s" % (indent, name, repr(values[0]))
else:
print "%s %s=(%s)" % (indent, name, ', '.join(map(repr, values)))
for name in missing_attrs:
print "%s %s=<missing>" % (indent, name)
def dump(self, indent='', prefix=None):
"""Afficher l'identifiant, le type et les attributs de ce lien
"""
self._dump_idtype(indent, prefix)
self._dump_attrs(indent)
################################################################################
# objets
class MetaObject(type):
def __init__(cls, name, bases, attrs):
type.__init__(cls, name, bases, attrs)
if cls.__dict__.get('TYPE', None) is None:
cls.TYPE = cls.__name__.lower()
register = not cls.__dict__.get('__NO_AUTO_REGISTER__', False)
resolve_first = cls.__dict__.get('__RESOLVE_FIRST__', False)
if register:
catalog.register(cls, cls.TYPE, resolve_first)
class Object(object):
"""Un objet générique
Un objet a un identifiant (propriété `oid`), un type (propriété `otype`), une
liste de valeurs (propriété `values`), des liens vers d'autres objets
(propriété `links`) et des attributs multivalués (toutes les autres propriétés).
Le type de l'objet définit un schéma, c'est à dire un ensemble d'attributs
spécifiques avec des valeurs par défaut. Les attributs du schéma sont les
attributs connus (propriété known_attrs), les autres sont les attributs
divers (propriété misc_attrs)
"""
__metaclass__ = MetaObject
__NO_AUTO_REGISTER__ = True
__RESOLVE_FIRST__ = False
ATTRS = dict(otype=None, oid=None, values=None, attrs=None, links=None)
TYPE = 'object'
_rw_attrs = set(('otype', 'oid'))
_ro_attrs = set(('values', 'attrs', 'links'))
_reserved_attrs = _rw_attrs | _ro_attrs
_otype = None
_oid = None
_values = None
_attrs = None
_links = None
_resolved = None
def __init__(self, oid=None, *values, **attrs):
self.__dict__['_otype'] = self.TYPE
self.__dict__['_oid'] = oid
self.__dict__['_values'] = []
self.__dict__['_attrs'] = {}
self.__dict__['_links'] = {}
self.__dict__['_resolved'] = False
self.update('values', values)
for attr, value in attrs.items():
self.update(attr, value)
def __parse(self, attr, value):
"""obtenir le parser qui permet de s'assurer que value est dans le bon
format pour l'attribut attr. Utiliser attr==None pour l'attribut values
"""
if isindex(attr): attr = 'values'
parser = self.ATTRS.get(attr, None)
if parser is None: return value
elif isseq(value): return flattenseq(map(parser.parse, value))
else: return parser.parse(value)
# accès aux valeurs (via un index numérique) et aux attributs (via le nom de
# l'attribut)
def __getattr__(self, attr):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
try:
if isindex(attr): return self._values[attr]
else: return self._attrs[attr]
except KeyError:
raise AttributeError(attr)
def __setattr__(self, attr, value):
value = self.__parse(attr, value)
if attr == 'values':
self._values[:] = listof(value)
elif attr in self._rw_attrs:
super(Object, self).__setattr__('_%s' % attr, value)
elif attr in self._ro_attrs:
raise AttributeError(attr)
elif attr in self.__dict__:
super(Object, self).__setattr__(attr, value)
elif isindex(attr):
self._values[attr] = value
else:
self._attrs[attr] = listof(value)
self.__dict__['_resolved'] = False
def __delattr__(self, attr):
if attr in self._reserved_attrs:
raise AttributeError(attr)
try:
if isindex(attr): del self._values[attr]
else: del self._attrs[attr]
except KeyError:
raise AttributeError(attr)
self.__dict__['_resolved'] = False
def __getitem__(self, attr):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
elif isindex(attr):
return self._values[attr]
else:
return self._attrs[attr]
def __setitem__(self, attr, value):
value = self.__parse(attr, value)
if attr == 'values':
self._values[:] = listof(value)
elif attr in self._rw_attrs:
return super(Object, self).__setattr__('_%s' % attr, value)
elif attr in self._ro_attrs:
raise KeyError(attr)
elif isindex(attr):
self._values[attr] = value
else:
self._attrs[attr] = listof(value)
self.__dict__['_resolved'] = False
def __delitem__(self, attr):
if attr in self._reserved_attrs:
raise KeyError(attr)
elif isindex(attr):
del self._values[attr]
else:
del self._attrs[attr]
self.__dict__['_resolved'] = False
# accès spécifique aux valeurs
__nonzero__ = lambda self: True
def __len__(self):
"""obtenir le nombre de valeurs"""
return len(self._values)
def __iter__(self):
"""obtenir un itérateur sur les valeurs"""
return self._values.__iter__()
def __reversed__(self):
"""obtenir la liste des valeurs inversée"""
return self._values.__reversed__()
def __contains__(self, item):
"""tester l'existence d'une valeur"""
return item in self._values
def append(self, value):
"""ajouter une valeur"""
return self._values.append(value)
def insert(self, index, value):
"""insérer une valeur à la position spécifiée"""
return self._values.insert(index, value)
def extend(self, seq):
"""étendre la liste des valeurs"""
return self._values.extend(seq)
# accès spécifique aux attributs
def first(self, attr, default=None):
"""obtenir la première valeur de l'attribut"""
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
if self._attrs.has_key(attr):
values = self._attrs[attr]
if values: return values[0]
return default
def get(self, attr, default=None):
"""obtenir l'attribut sous forme de liste"""
if attr in self._reserved_attrs:
return listof(getattr(self, '_%s' % attr))
else:
return self._attrs.get(attr, default)
def has_key(self, attr):
"""tester l'existence d'un attribut"""
if attr in self._reserved_attrs:
return True
else:
return self._attrs.has_key(attr)
@property
def known_attrs(self):
"""obtenir une liste triée d'attributs faisant partie du schéma"""
return sorted(list(self.ATTRS.keys()))
@property
def known_rw_attrs(self):
"""obtenir une liste triée des attributs faisant partie du schéma accessibles en écriture"""
return sorted(list(set(self.ATTRS.keys()) - self._ro_attrs))
@property
def misc_attrs(self):
"""obtenir une liste triée d'attributs ne faisant pas partie du schéma"""
schema_attrs = set(self.ATTRS.keys())
defined_attrs = set(self._attrs.keys())
return sorted(list(defined_attrs - schema_attrs))
@property
def missing_attrs(self):
"""obtenir une liste triée d'attributs faisant partie du schéma mais non définis"""
schema_attrs = set(self.ATTRS.keys())
defined_attrs = set(self._attrs.keys())
return sorted(list(schema_attrs - defined_attrs - self._reserved_attrs))
def update(self, attr, value=None, update_type=ADD_UNIQUE):
"""mettre à jour l'attribut spécifié
si l'attribut n'existe pas, il est créé. sinon, la liste des valeurs de
l'attribut est étendue.
si value==None, aucune mise à jour n'est effectuée
si attr est une instance de dictionnaire, mettre à jour *tous* les
attributs spécifiés.
update_type est la méthode de mise à jour
"""
if isinstance(attr, dict):
attrs = attr
for attr, value in attrs.items():
self.update(attr, value, update_type)
return self
if attr == 'values': pass
elif attr in self._reserved_attrs:
raise KeyError(attr)
if value is not None:
values = listof(self.__parse(attr, value))
if attr == 'values':
attr = self._values
else:
if not self._attrs.has_key(attr): self._attrs[attr] = []
attr = self._attrs[attr]
if update_type is ADD_UNIQUE:
for value in values:
if value not in attr:
attr.append(value)
elif update_type is ADD:
attr.extend(values)
elif update_type is REMOVE:
for value in values:
if value in attr:
attr.remove(value)
elif update_type is RESET_ADD:
attr[:] = values
self.__dict__['_resolved'] = False
return self
def set_defaults(self, attr, value=None, update_type=ADD_UNIQUE):
"""Mettre à jour l'attribut spécifié s'il n'existe pas
si value==None, aucune mise à jour n'est effectuée
utiliser attr==None pour mettre à jour l'attribut values
si attr est une instance de dictionnaire, mettre à jour *tous* les
attributs spécifiés.
"""
if isinstance(attr, dict):
attrs = attr
for attr, value in attrs.items():
self.set_defaults(attr, value, update_type)
return self
if attr == 'values':
if not self._values:
self.update('values', value, update_type)
elif attr in self._reserved_attrs:
raise KeyError(attr)
elif not self._attrs.has_key(attr):
self.update(attr, value, update_type)
return self
def clone(self):
"""cloner cet objet"""
o = self.__class__(self._oid, self._values, **self._attrs)
# XXX cloner aussi les liens
return o
# gestion des liens
def linkto(self, loi, otype=None, **attrs):
"""lier vers une référence d'un autre objet
loi peut être:
* une instance de Link
* une instance d'Object
* un identifiant d'objet, auquel cas otype est requis
@return l'instance du lien créé
"""
if isinstance(loi, Link):
create = False
link = loi.clone()
elif isinstance(loi, Object):
otype = loi.otype
oid = loi.oid
create = True
else:
if otype is None: raise ValueError('otype is required')
oid = loi
create = True
if create:
link = Link(otype, oid, **attrs)
else:
link.update(attrs)
links = self._links
if not links.has_key(link.otype):
links[otype] = []
links[otype].append(link)
self.__dict__['_resolved'] = False
return link
def get_links(self, otype=None, clone=False):
"""retourner les liens vers les objets du type spécifié
si otype==None, alors retourner tous les liens
si clone==True, faire un clone des liens avant de les retourner
"""
if otype is None:
links = []
for otype, tmplinks in self._links.items():
links.extend(tmplinks)
else:
links = listof(self._links.get(otype, ()))
if clone:
links = [link.clone() for link in links]
return links
# catalogue
def resolve_basedir(self, basedirs, dirs=False, files=False,
filespec=None,
dir_attr='dir', file_attr='file',
parentdir_attr='parentdir'):
"""retourner les chemins absolus des fichiers (et/ou répertoires) trouvés dans
les répertoires basedirs
si les arguments dir_attr, file_attr, parentdir_attr ne sont pas None
(ce qui est le cas par défaut), alors l'attribut est mis à jour avec
respectivement les répertoires, les fichiers, et les répertoires parent
trouvés
"""
filespecs = listof(filespec, None)
result = []
for basedir in basedirs:
basedir = path.expanduser(basedir)
basedir = path.abspath(basedir)
for name in os.listdir(basedir):
if filespecs is not None:
found = False
for filespec in filespecs:
if fnmatch(name, filespec):
found = True
break
if not found: continue
pf = path.join(basedir, name)
if path.isdir(pf) and (dirs or dirs == files):
result.append(pf)
if dir_attr is not None:
self.update(dir_attr, pf)
elif path.isfile(pf) and (files or dirs == files):
result.append(pf)
if file_attr is not None:
self.update(file_attr, pf)
if parentdir_attr is not None:
self.update(parentdir_attr, map(path.dirname, result))
return result
def resolve_filespec(self, filespecs, dirs=False, files=False,
dir_attr='dir', file_attr='file',
parentdir_attr='parentdir'):
"""retourner les chemins absolus des fichiers (et/ou répertoires) correspondant
aux modèles filespecs (qui doivent être de type glob)
si les arguments dir_attr, file_attr, parentdir_attr ne sont pas None
(ce qui est le cas par défaut), alors l'attribut est mis à jour avec
respectivement les répertoires, les fichiers, et les répertoires parent
trouvés
"""
result = []
for filespec in filespecs:
filespec = path.expanduser(filespec)
for file in glob(filespec):
pf = path.abspath(file)
if path.isdir(pf) and (dirs or dirs == files):
result.append(pf)
if dir_attr is not None:
self.update(dir_attr, pf)
elif path.isfile(pf) and (files or dirs == files):
result.append(pf)
if file_attr is not None:
self.update(file_attr, pf)
if parentdir_attr is not None:
self.update(parentdir_attr, map(path.dirname, result))
return result
def _resolve(self, catalog):
"""à surcharger dans les classes dérivées"""
values = []
search_basedir = self.get('search_basedir', ())
files = 'files' in search_basedir
dirs = 'dirs' in search_basedir
basedir = self.get('basedir', None)
if basedir is not None:
values.extend(self.resolve_basedir(basedir, files=files, dirs=dirs))
dirspec = self.get('dirspec', None)
if dirspec is not None:
values.extend(self.resolve_filespec(dirspec, dirs=True))
filespec = self.get('filespec', None)
if filespec is not None:
values.extend(self.resolve_filespec(filespec, files=True))
if not self.values:
self.values = values
def resolve(self, catalog, recursive=True):
"""normaliser cet objet et compléter les données manquantes. si recursive==True
(la valeur par défaut), normaliser aussi les objets liés.
@return True si l'objet a été modifié, False si l'objet avait déjà été résolu
"""
if self._resolved: return False
self._resolve(catalog)
if recursive:
for otype, links in self.links.items():
for link in links:
link.resolve(catalog)
self.__dict__['_resolved'] = True
return True
# divers
def _dump_idtype(self, indent):
print "%s%s:%s" % (indent, self._otype, self._oid)
def _dump_values(self, indent):
values = self._values
if len(values) == 0:
pass
elif len(values) == 1:
print "%s values=%s" % (indent, repr(values[0]))
else:
print "%s values=(%s)" % (indent, ', '.join(map(repr, values)))
def _dump_attrs(self, indent):
attrs = self._attrs
missing_attrs = self.missing_attrs
if attrs or missing_attrs:
print "%s attrs:" % indent
for name, values in attrs.items():
if len(values) == 1:
print "%s %s=%s" % (indent, name, repr(values[0]))
else:
print "%s %s=(%s)" % (indent, name, ', '.join(map(repr, values)))
for name in missing_attrs:
print "%s %s=<missing>" % (indent, name)
def _dump_links(self, indent):
if self.links:
for ltype, links in self.links.items():
for link in links:
link.dump("%s " % indent, '+->')
def dump(self, indent=''):
"""Afficher l'identifiant, le type, les valeurs, les attributs et les liens de cet objet
"""
self._dump_idtype(indent)
self._dump_values(indent)
self._dump_attrs(indent)
self._dump_links(indent)
def __repr__(self):
oid = repr(self._oid)
values = self._values
if values: values = ", %s" % ', '.join(map(repr, values))
else: values = ""
attrs = self._attrs
if attrs: attrs = ", **%s" % repr(attrs)
else: attrs = ""
return "%s(%s%s%s)" % (self.__class__.__name__, oid, values, attrs)
################################################################################
# Faits
class Fact(object):
"""Un fait liant deux références d'objets
Le fait a le type de l'objet source (propriété `sotype`), son identifiant
(propriété `soid`), le verbe décrivant le lien (propriété `verb`), le type
de l'objet cible (propriété `totype`), son identifiant (propriété `toid`),
et des attributs multivalués (toutes les autres propriétés)
"""
ATTRS = dict(
sotype=None, soid=None,
verb=None,
totype=None, toid=None,
attrs=None,
)
_rw_attrs = set(('sotype', 'soid', 'verb', 'totype', 'toid'))
_ro_attrs = set(('attrs',))
_reserved_attrs = _rw_attrs | _ro_attrs
_sotype = None
_soid = None
_verb = None
_totype = None
_toid = None
_attrs = None
def __init__(self, sotype=None, soid=None, verb=None, totype=None, toid=None, **attrs):
if verb.startswith('~'):
verb = verb[1:]
tmpotype, tmpoid = totype, toid
totype, toid = sotype, soid
sotype, soid = tmpotype, tmpoid
self.__dict__['_sotype'] = sotype
self.__dict__['_soid'] = soid
self.__dict__['_verb'] = verb
self.__dict__['_totype'] = totype
self.__dict__['_toid'] = toid
self.__dict__['_attrs'] = {}
for attr, value in attrs.items():
self.update(attr, value)
def __parse(self, attr, value):
"""obtenir le parser qui permet de s'assurer que value est dans le bon
format pour l'attribut attr.
"""
if isindex(attr): parser = None
else: parser = self.ATTRS.get(attr, None)
if parser is None: return value
elif isseq(value): return flattenseq(map(parser.parse, value))
else: return parser.parse(value)
# accès aux attributs
def __getattr__(self, attr):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
try:
return self._attrs[attr]
except KeyError:
raise AttributeError(attr)
def __setattr__(self, attr, value):
value = self.__parse(attr, value)
if attr in self._rw_attrs:
return super(Link, self).__setattr__('_%s' % attr, value)
elif attr in self._ro_attrs:
raise AttributeError(attr)
else:
self._attrs[attr] = listof(value)
def __delattr__(self, attr):
if attr in self._reserved_attrs:
raise AttributeError(attr)
try: del self._attrs[attr]
except KeyError: raise AttributeError(attr)
def __getitem__(self, attr):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
else:
return self._attrs[attr]
def __setitem__(self, attr, value):
value = self.__parse(attr, value)
if attr in self._rw_attrs:
return super(Link, self).__setattr__('_%s' % attr, value)
elif attr in self._ro_attrs:
raise KeyError(attr)
else:
self._attrs[attr] = listof(value)
def __delitem__(self, attr):
if attr in self._reserved_attrs: raise KeyError(attr)
else: del self._attrs[attr]
def first(self, attr, default=None):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
if self._attrs.has_key(attr):
values = self._attrs[attr]
if values: return values[0]
return default
def get(self, attr, default=None):
if attr in self._reserved_attrs:
return getattr(self, '_%s' % attr)
else:
return self._attrs.get(attr, default)
def has_key(self, attr):
"""tester l'existence d'un attribut"""
if attr in self._reserved_attrs:
return True
else:
return self._attrs.has_key(attr)
@property
def known_attrs(self):
"""obtenir une liste triée d'attributs faisant partie du schéma"""
return sorted(list(self.ATTRS.keys()))
@property
def misc_attrs(self):
"""obtenir une liste triée d'attributs ne faisant pas partie du schéma"""
schema_attrs = set(self.ATTRS.keys())
defined_attrs = set(self._attrs.keys())
return sorted(list(defined_attrs - schema_attrs))
@property
def missing_attrs(self):
"""obtenir une liste triée d'attributs faisant partie du schéma mais non définis"""
schema_attrs = set(self.ATTRS.keys())
defined_attrs = set(self._attrs.keys())
return sorted(list(schema_attrs - defined_attrs - self._reserved_attrs))
def update(self, attr, value=None, update_type=ADD_UNIQUE):
"""mettre à jour l'attribut spécifié
si l'attribut n'existe pas, il est créé. sinon, la liste des valeurs de
l'attribut est étendue.
si value==None, aucune mise à jour n'est effectuée
si attr est une instance de dictionnaire, mettre à jour *tous* les
attributs spécifiés.
update_type est la méthode de mise à jour
"""
if isinstance(attr, dict):
attrs = attr
for attr, value in attrs.items():
self.update(attr, value, update_type)
return self
if attr in self._reserved_attrs:
raise KeyError(attr)
if value is not None:
values = listof(self.__parse(attr, value))
if not self._attrs.has_key(attr): self._attrs[attr] = []
attr = self._attrs[attr]
if update_type is ADD_UNIQUE:
for value in values:
if value not in attr:
attr.append(value)
elif update_type is ADD:
attr.extend(values)
elif update_type is REMOVE:
for value in values:
if value in attr:
attr.remove(value)
elif update_type is RESET_ADD:
attr[:] = values
return self
def set_defaults(self, attr, value=None, update_type=ADD_UNIQUE):
"""Mettre à jour l'attribut spécifié s'il n'existe pas
si value==None, aucune mise à jour n'est effectuée
si attr est une instance de dictionnaire, mettre à jour *tous* les
attributs spécifiés s'ils n'existent pas.
"""
if isinstance(attr, dict):
attrs = attr
for attr, value in attrs.items():
self.set_defaults(attr, value, update_type)
return self
if attr in self._reserved_attrs:
raise KeyError(attr)
if not self._attrs.has_key(attr):
self.update(attr, value, update_type)
return self
def clone(self):
"""cloner ce lien"""
return self.__class__(self._sotype, self._soid, self._verb, self._totype, self._toid, **self._attrs)
# catalogue
def sresolve(self, catalog, default=_RAISE_EXCEPTION, create=True, resolve=True):
return catalog.get(self.sotype, self.soid, default, create, resolve)
def tresolve(self, catalog, default=_RAISE_EXCEPTION, create=True, resolve=True):
return catalog.get(self.totype, self.toid, default, create, resolve)
def resolve(self, catalog, default=_RAISE_EXCEPTION, create=True, resolve=True):
"""obtenir les objets liés (source, verb, target)
"""
source = catalog.get(self.sotype, self.soid, default, create, resolve)
target = catalog.get(self.totype, self.toid, default, create, resolve)
return (source, self.verb, target)
# divers
def _dump_idtype(self, indent, prefix=None):
if prefix is None: prefix = ''
else: prefix = "%s " % prefix
print "%s%s%s:%s %s %s:%s " % (indent, prefix, self._sotype, self._soid, self._verb, self._totype, self._toid)
def _dump_attrs(self, indent):
attrs = self._attrs
missing_attrs = self.missing_attrs
if attrs or missing_attrs:
print "%s attrs:" % indent
for name, values in attrs.items():
if len(values) == 1:
print "%s %s=%s" % (indent, name, repr(values[0]))
else:
print "%s %s=(%s)" % (indent, name, ', '.join(map(repr, values)))
for name in missing_attrs:
print "%s %s=<missing>" % (indent, name)
def dump(self, indent='', prefix=None):
"""Afficher l'identifiant, le type et les attributs de ce lien
"""
self._dump_idtype(indent, prefix)
self._dump_attrs(indent)
################################################################################
# variables globales
catalog = Catalog()