385 lines
13 KiB
Python

# -*- coding: utf-8 mode: python -*- vim:sw=4:sts=4:et:ai:si:sta:fenc=utf-8
__all__ = (
'LazyObject',
'dbconfig', 'fixsqlite', 'lazydb', 'oradbinfos',
'ColSchema', 'Col',
'RowSchema', 'Row', 'RowCtl',
'TableSchema',
'incd', 'excd',
'Migration',
)
from os import path
from types import StringType
import csv
from ..base import Undef, odict, seqof
from ..ext import web
from .api import uval
Undef = Undef() # spécifique à ce module
class LazyObject(object):
"""Un objet proxy vers un autre objet qui est construit avec retard
"""
_object = None # objet final
def __init__(self):
self.__dict__['_object'] = None
def _resolve(self):
"""méthode à dériver: elle doit construire et retourner l'objet final
"""
def __object(self):
if self._object is None:
self.__dict__['_object'] = self._resolve()
return self._object
def __getattr__(self, name):
return getattr(self.__object(), name)
def __setattr__(self, name, value):
if name in self.__dict__: self.__dict__[name] = value
else: setattr(self.__object(), name, value)
def __delattr__(self, name):
if name in self.__dict__: del self.__dict__[name]
else: delattr(self.__object(), name)
def __getitem__(self, key):
return self.__object()[key]
def __setitem__(self, key, value):
self.__object()[key] = value
def __delitem__(self, key):
del self.__object()[key]
def __call__(self, *args, **kw):
return self.__object()(*args, **kw)
def dbconfig(prefix='db'):
from .config_loader import config
dbtype = config.get('%stype' % prefix, None)
dbname = config.get('%sname' % prefix, None)
dbuser = config.get('%suser' % prefix, None)
dbpass = config.get('%spass' % prefix, None)
if dbtype is None: dbtype = 'sqlite'
if dbname is None: dbname = ':memory:'
kw = dict(dbn=dbtype, db=dbname)
if dbuser is not None: kw.update(user=dbuser, pw=dbpass)
return kw
def fixsqlite(kw):
# pour les bases sqlite, un chemin relatif est exprimée par rapport à
# $basedir/var
# config.basedir n'est dispo qu'après l'initialisation de l'application.
# Cette fonction doit donc être appelée dans une instance de LazyObject
if kw['dbn'] == 'sqlite' and kw['db'] != ':memory' and not path.isabs(kw['db']):
from .config_loader import config
kw['db'] = path.join(config.basedir, 'var', '%s.sqlite' % kw['db'])
return kw
def lazydb(prefix='db'):
class LazyDb(LazyObject):
_resolve = lambda self: web.database(**fixsqlite(dbconfig(prefix)))
return LazyDb()
class oradbinfos(object):
def __init__(self, db):
self.db = db
def __call__(self):
kw = self.db.keywords
return u"%s@%s" % (kw['user'], kw['dsn'])
def __unicode__(self):
return self()
class ColSchema(odict):
def __init__(self, formatter=Undef, link=Undef, **kw):
formatter = Undef.sa(formatter, kw, 'f', None)
link = Undef.sa(link, kw, 'u', None)
super(ColSchema, self).__init__(**kw)
self.formatter = formatter
self.link = link
def css(self, value=Undef):
return None
def format(self, value, index=Undef):
formatter = self.formatter
if type(formatter) is StringType:
return formatter % value
elif formatter is not None:
return formatter(value)
else:
return value
def text(self, value, index=Undef):
return self.format(value, index)
def html(self, value, index=Undef):
link = uval(self.link)
value = uval(self.format(value, index))
if link is None: return value
else: return u'<a href="%s">%s</a>' % (link, value)
class Col(odict):
def __init__(self, key, text=Undef, schema=Undef, formatter=Undef, link=Undef, **kw):
text = Undef.sa(text, kw, 't', None)
schema = Undef.sa(schema, kw, 's', None)
formatter = Undef.sa(formatter, kw, 'f')
link = Undef.sa(link, kw, 'u')
super(Col, self).__init__(**kw)
if text is None: text = key
if schema is None and (formatter is not Undef or link is not Undef):
schema = ColSchema(formatter, link)
self.key = key
self.text = text
self.schema = schema
def geti(values, index):
return values[index] if index >= 0 and index < len(values) else None
class RowSchema(odict):
def __init__(self, cols=Undef, pks=Undef, actions=Undef, **kw):
cols = seqof(Undef.sa(cols, kw, None, None))
pks = seqof(Undef.sa(pks, kw, 'pk', None))
actions = seqof(Undef.sa(actions, kw, None, None))
super(RowSchema, self).__init__(**kw)
self.__dict__['overlays'] = []
self.cols = cols
self.pks = pks
self.actions = actions
def col(self, index=Undef):
if index is not Undef:
return geti(self.cols, index)
return None
def col_schema(self, value=Undef, index=Undef):
if index is not Undef:
col = geti(self.cols, index)
if col is not None: return col.schema
return None
def pk_dict(self, values):
d = {}
for pk in self.pks:
d[pk] = values[pk]
return d
def css(self, value=Undef):
return None
class Row(object):
"""un proxy vers une ligne de web.database
cet objet est prévu pour être étendu avec de nouvelles propriétés calculées.
"""
_row = None
def __init__(self, row):
self.__dict__['_row'] = row
def keys(self): return self._row.keys()
def values(self): return self._row.values()
def items(self): return self._row.items()
def has_key(self): return self._row.has_key()
def get(self): return self._row.get()
def clear(self): return self._row.clear()
def setdefault(self): return self._row.setdefault()
def iterkeys(self): return self._row.iterkeys()
def itervalues(self): return self._row.itervalues()
def iteritems(self): return self._row.iteritems()
def pop(self): return self._row.pop()
def popitem(self): return self._row.popitem()
def update(self): return self._row.update()
def __iter__(self): return self._row.__iter__()
def __contains__(self, key): return self._row.__contains__(key)
def copy(self): return self.__class__(self._row)
def __getattr__(self, name):
row = self._row
try:
return getattr(row, name)
except AttributeError:
uname = name.upper()
if hasattr(row, uname): return getattr(row, uname)
lname = name.lower()
if hasattr(row, lname): return getattr(row, lname)
raise
def __setattr__(self, name, value):
if name in self.__dict__: self.__dict__[name] = value
else: setattr(self._row, name, value)
def __delattr__(self, name):
if name in self.__dict__: del self.__dict__[name]
else: delattr(self._row, name)
def __getitem__(self, key):
row = self._row
try:
return row[key]
except KeyError:
ukey = key.upper()
if ukey in row: return row[ukey]
lkey = key.lower()
if lkey in row: return row[lkey]
raise
def __setitem__(self, key, value):
self._row[key] = value
def __delitem__(self, key):
del self._row[key]
def __repr__(self): return '<%s:%r>' % (self.__class__.__name__, self._row)
def __str__(self): return str(self._row)
def _firstof(values):
for value in values:
return value
return None
class TableSchema(odict):
@staticmethod
def parse(rows):
# on assume que rows est une liste de dictionnaires
cols = [Col(key) for key in _firstof(rows)] if rows else Undef
return TableSchema(cols)
def __init__(self, cols=Undef, pks=Undef, actions=Undef, **kw):
cols = seqof(Undef.sa(cols, kw, None, None))
pks = seqof(Undef.sa(pks, kw, 'pk', None))
actions = seqof(Undef.sa(actions, kw, None, None))
super(TableSchema, self).__init__(**kw)
self.cols = cols
self.pks = pks
self.actions = actions
self.row_schemas = {}
self.row_schemas['header'] = RowSchema(cols, pks, actions)
self.row_schemas['body'] = RowSchema(cols, pks, actions)
def headers(self):
return [col.text for col in self.cols]
def header_row_schema(self):
return self.row_schemas["header"]
def body_row_schema(self, index):
return self.row_schemas["body"]
def css(self):
return None
def incd(fd, *names):
td = dict()
if not names:
names = [name for name in fd.keys() if name != 'self']
for name in names:
value = fd.get(name, None)
if value is not None:
td[name] = value
return td
def excd(fd, *excludes):
td = dict()
excludes = set(excludes)
excludes.add('self')
names = [name for name in fd.keys() if name not in excludes]
for name in names:
value = fd.get(name, None)
if value is not None:
td[name] = value
return td
class DBMixin(object):
def dbquery(self, *args, **kw): return self._map(self.DB.query(*args, **kw))
def dbwhere(self, *args, **kw): return self._map(self.DB.where(*args, **kw))
def dbselect(self, *args, **kw): return self._map(self.DB.select(*args, **kw))
query, where, select = dbquery, dbwhere, dbselect
def dbinsert(self, *args, **kw): return self.DB.insert(*args, **kw)
def dbmultiple_insert(self, *args, **kw): return self.DB.multiple_insert(*args, **kw)
def dbupdate(self, *args, **kw): return self.DB.update(*args, **kw)
def dbdelete(self, *args, **kw): return self.DB.delete(*args, **kw)
insert, multiple_insert, update, delete = dbinsert, dbmultiple_insert, dbupdate, dbdelete
def dbtrans(self, *args, **kw): return self.DB.transaction(*args, **kw)
transaction = dbtrans
def _fix_NULL(self, row, allow_null=True):
if allow_null:
for key, value in row.items():
if value == 'NULL':
row[key] = None
return row
def dbloadcsv(self, table, file, allow_null=True):
inf = open(file, "rb")
try:
reader = csv.DictReader(inf)
self.dbmultiple_insert(table, [self._fix_NULL(row, allow_null) for row in reader])
finally:
inf.close()
def tbwhere(self, *args, **kw): return self._map(self.DB.where(self.TB, *args, **kw))
def tbselect(self, *args, **kw): return self._map(self.DB.select(self.TB, *args, **kw))
def tbinsert(self, *args, **kw): return self.DB.insert(self.TB, *args, **kw)
def tbmultiple_insert(self, *args, **kw): return self.DB.multiple_insert(self.TB, *args, **kw)
def tbupdate(self, *args, **kw): return self.DB.update(self.TB, *args, **kw)
def tbdelete(self, *args, **kw): return self.DB.delete(self.TB, *args, **kw)
def _parse(self, kw):
pass
def find(self, **kw):
self._parse(kw)
return self.tbwhere(**kw)
def get(self, **kw):
return web.iterbetter(self.find(**kw))[0]
def create(self, **kw):
self._parse(kw)
return self.tbinsert(**kw)
def pkupdate(self, pk, **kw):
self._parse(kw)
self.tbupdate("%s = $pk" % self.PK, vars=locals(), **excd(kw, 'pk'))
def pkdelete(self, pk):
self.tbdelete("%s = $pk" % self.PK, vars=locals())
class RowCtl(DBMixin):
DB = None
TB = None
ROW = None
PK = "id"
PARSERS = None
def _parse(self, kw):
parsers = self.PARSERS
if parsers is not None:
for key, parser in parsers.items():
if key in kw:
kw[key] = parser(kw[key])
def _map(self, items):
ROW = self.ROW
if ROW is None: return items
else: return (ROW(item) for item in items)
class Migration(DBMixin):
DB = None
NAME = None
def _map(self, items):
return items
def migrate(self):
version = self.initial() + 1
while True:
method = getattr(self, 'version%i' % version, None)
if method is None: break
method()
self.dbquery("update _dbupdater set version = $version where name = $name",
vars = dict(version=version, name=self.NAME))
version += 1
def __dbinfos(self):
return self.dbwhere('_dbupdater', name=self.NAME).first()
def initial(self):
try: dbinfos = self.__dbinfos()
except:
self.dbquery("create table _dbupdater (name varchar(64), version integer)")
dbinfos = self.__dbinfos()
if not dbinfos:
self.dbinsert('_dbupdater', name=self.NAME, version=-1)
dbinfos = self.__dbinfos()
return dbinfos.version