# -*- coding: utf-8 mode: python -*- vim:sw=4:sts=4:et:ai:si:sta:fenc=utf-8
import ulib.base.i_need_py24
__all__ = ('nocache', 'auth', 'defaults', 'Page', 'Application')
import os, sys, base64, logging
from os import path
from ulib.base.base import isstr, isbool, isnum, isflt, seqof, make_prop
from ulib.base.uio import _u, Utf8IO
from ulib.base.output import enote
from ulib.base.args import get_args
from ulib.base.dates import rfc2822
from ulib.base.functions import apply_args
from ulib.base.words import plural
from ulib.formats import unicodeF, booleanF, integerF, floatF
from ulib.ext import web
def checked(b):
return b and u'checked="checked"' or u''
def favicon(href):
return ur'' % href
def css(href, media=None):
if media is not None:
media = ur' media="%s"' % media
return ur'' % (href, media or u'')
def js(href):
return ur'' % href
def jscss(href, min=False):
if href.endswith(".min.js"):
jshref = href
csshref = "%s.css" % href[:-len(".min.js")]
elif href.endswith(".js"):
jshref = href
csshref = "%s.css" % href[:-len(".js")]
elif href.endswith(".css"):
jshref = "%s.js" % href[:-len(".css")]
csshref = href
return u"%s\n%s" % (js(jshref), css(csshref))
BPSUPPL_CSS = "bpsuppl.css"
def blueprintcss(prefix=None, suppl=True):
if prefix is None:
prefix = ""
bpsuppl = BPSUPPL_CSS
else:
pprefix = path.split(prefix)[0]
bpsuppl = path.join(pprefix, BPSUPPL_CSS)
if not prefix.endswith("/"): prefix = prefix + "/"
return u"\n".join([css("%sscreen.css" % prefix, "screen, projection"),
css("%sprint.css" % prefix, "print"),
u"",
css(bpsuppl),
])
DEFAULT_TEMPLATE_GLOBALS = {'_u': _u,
'checked': checked,
'favicon': favicon,
'css': css,
'js': js,
'jscss': jscss,
'blueprintcss': blueprintcss,
'plural': plural,
}
HANDLER_CLASSES = []
class MetaPage(type):
u"""Méta-classe qui suit la définition des classes dérivées de Page, afin
de construire automatiquement la liste des classes handler.
"""
def __init__(cls, name, bases, attrs):
type.__init__(cls, name, bases, attrs)
abstract = cls.__dict__.get('__abstract_page__', False)
if not abstract:
_fix_PATH(cls, name)
HANDLER_CLASSES.append(cls)
def nocache(method):
u"""Décorateur pour s'assurer que le résultat de la méthode web n'est pas mis en cache
"""
def wrapper(self, *args, **kw):
web.header("Expires", rfc2822(0))
web.header("Pragma", "no-cache")
web.header("Cache-control", "max-age=0, no-cache, must-revalidate")
return method(self, *args, **kw)
return wrapper
def auth(authenticator=None, realm='pyutools'):
u"""Décorateur pour s'assurer que la méthode web est authentifiée.
La fonction authenticator avec la signature (username, password) permet
d'authentifier l'utilisateur connecté.
"""
def decorator(method):
def wrapper(self, *args, **kw):
env = web.ctx.environ
if env.has_key('HTTP_AUTHORIZATION'):
auth = env['HTTP_AUTHORIZATION'].split()
if auth[0:1] and auth[0].lower() == 'basic':
username, password = base64.b64decode(auth[1]).split(':')
if authenticator is None or apply_args(authenticator, username, password):
return method(self, *args, **kw)
web.header('WWW-Authenticate', 'Basic realm="%s"' % realm, True)
return web.HTTPError('401 Unauthorized')
return wrapper
return decorator
def defaults(*required, **defaults):
u"""Initialiser dans l'objet courant des variables à des valeurs par
défaut, ou en les prenant parmi les paramètres de la requête.
Si la valeur par défaut est respectivement une chaine, un booléen, un entier,
une valeur flottante, la valeur de la requête est automatiquement convertie
et la valeur effective sera toujours respectivement une chaine, un booléen,
un entier, une valeur flottante
"""
def decorator(method):
def wrapper(*args, **kw):
self = args[0:1] and args[0] or None
if self is not None:
self.defaults = web.input(*required, **defaults)
def fix(name, format):
value = self.defaults[name] or None
if format is not None:
self.defaults[name] = format.parse(value)
for name, value in defaults.items():
if isstr(value): fix(name, unicodeF)
elif isbool(value): fix(name, booleanF)
elif isnum(value): fix(name, integerF)
elif isflt(value): fix(name, floatF)
else: fix(name, None)
return method(*args, **kw)
return wrapper
return decorator
class Msg(object):
DEFAULT_MSG_CLASSES = (u"success", )
_msg_value, msg_value = make_prop('_msg_value')[:2]
_msg_classes, msg_classes = make_prop('_msg_classes', DEFAULT_MSG_CLASSES)[:2]
msg_class = property(lambda self: u" ".join(self._msg_classes))
_level, level = make_prop('_level')[:2]
def __fix_msg_class(self, msg_class):
if msg_class is None: msg_class = self.DEFAULT_MSG_CLASSES[0]
return _u(msg_class)
def __init__(self, msg_value=None, msg_classes=None, level=None):
if msg_value is not None: self._msg_value = _u(msg_value)
if msg_classes is not None:
self._msg_classes = tuple(map(self.__fix_msg_class, seqof(msg_classes)))
if level is not None: self._level = level
NO_MSG = Msg()
def _fix_PATH(cls, name):
if cls.PATH is None:
if cls.PREFIX is None:
cls.PREFIX = '/%s' % name
if cls.PREFIX == '/': cls.PATH = '/(.*)'
else: cls.PATH = '%s(?:/(.*))?' % cls.PREFIX
class Page(object):
__metaclass__ = MetaPage
__abstract_page__ = True
app = None # instance de Application
uio = None # instance de Utf8IO
session = property(lambda self: self.app.session)
_render = None # instance de web.template.render
TEMPLATEDIR = None # répertoire relatif des templates pour cette classe
TEMPLATENAME = None # nom du template à utiliser pour cette classe
# par défaut, TEMPLATENAME = le_nom_de_la_classe
PREFIX = None # url relatif d'accès aux méthodes de cette classe
# par défaut, PREFIX = '/%s' % le_nom_de_la_classe
PATH = None # expression régulière qui matche les urls traités par cette classe
# par défaut, PATH = '%s/(.*)' % PREFIX
defaults = None
def __getattr__(self, name):
defaults = self.defaults
if defaults is not None:
if defaults.has_key(name):
return defaults[name]
raise AttributeError(name)
def init(self):
u"""Initialiser l'objet
"""
msgs = property(lambda self: self.app.msgs)
def clear_msgs(self, level=None): return self.app.clear_msgs(level)
def add_msg(self, msg_value, msg_class=None, level=None):
return self.app.add_msg(msg_value, msg_class, level)
last_msg = property(lambda self: self.app.last_msg)
msg_value = property(lambda self: self.app.msg_value)
msg_class = property(lambda self: self.app.msg_class)
def get_last_error_msg(self, exc_info=None):
u"""Obtenir le message d'erreur de la dernière exception, ainsi que la
classe CSS associée.
msg vaut None s'il n'y a pas d'erreur. msgclass peut valoir success,
notice ou error.
@return: msg, msgclass
"""
if exc_info is None: exc_info = sys.exc_info()[:2]
type, value = exc_info[:2]
if type is None:
return None, u"success"
elif isinstance(value, Warning):
return u"%s: %s" % (_u(type.__name__), _u(value)), u"notice"
elif isinstance(value, Exception):
return u"%s: %s" % (_u(type.__name__), _u(value)), u"error"
last_error_msg = property(get_last_error_msg)
def add_last_error_msg(self, log=True):
msg_value, msg_class = self.get_last_error_msg()
if log: logging.exception(msg_value)
self.add_msg(msg_value, msg_class)
def redirect(self, path, **kw):
return web.redirect(web.url(path, **kw))
def r(self, name, *args, **kw):
templatedir = getattr(self, 'TEMPLATEDIR', None)
if templatedir is not None:
name = path.join(templatedir, name)
render = getattr(self._render, name)
return render(*args, **kw)
def render(self, *args, **kw):
name = getattr(self, 'TEMPLATENAME', None)
if name is None: name = self.__class__.__name__
return self.r(name, *args, **kw)
NAME = None # nom de la méthode à appeler, ou None s'il n'y a pas de méthode définie
METHOD = None # GET ou POST
def __response(self, name, method):
self.NAME = name
self.METHOD = method
if not name: name = self.__class__.__name__
m = getattr(self, name, None)
if m is None: m = self.error
# XXX Ajouter un argument en fonction de la valeur du header Accept:, pour text/plain,
# text/html, application/xml et application/xhtml+xml
return apply_args(m, method)
def GET(self, name=None): return self.__response(name, 'GET')
def POST(self, name=None): return self.__response(name, 'POST')
def error(self): raise web.notfound()
def index(self): return self.render()
class Application(object):
HOST = '0.0.0.0'
PORT = 12345
DEBUG = False
webapp = None # instance de web.application
uio = Utf8IO()
session = None # instance de web.session.Session
basedir = None # répertoire de base pour les fichiers servis
render = None # instance de web.template.render
templatedir = None # répertoire des templates
template_globals = None # dictionnaire global pour render
args = None # arguments qui restent sur la ligne de commande
# messages de l'application
_msgs, msgs = make_prop('_msgs')[:2]
def __init__(self, basedir=None, templatedir=None, host=None, port=None, debug=None):
if host is not None: self.HOST = host
if port is not None: self.PORT = port
if debug is not None: self.DEBUG = debug
if self.basedir is None or basedir is not None:
if basedir is None:
# par défaut, basedir est répertoire qui contient la classe
module_name = self.__class__.__module__
module = sys.modules.get(module_name, None)
if module is not None:
file = module.__file__
if file is not None:
basedir = path.abspath(path.split(file)[0])
if basedir is None:
basedir = os.getcwd()
self.basedir = basedir
if self.templatedir is None or templatedir is not None:
if templatedir is None:
templatedir = path.join(self.basedir, 'templates')
self.templatedir = templatedir
tg = self.template_globals
if tg is None: tg = {}
tg.update(DEFAULT_TEMPLATE_GLOBALS)
self.template_globals = tg
self.clear_msgs()
OPTIONS = None # options
LONG_OPTIONS = None
def process_option(self, option, value):
u"""Traiter une option et retourner True, ou False si l'option n'est
pas reconnue.
Cette méthode est prévue pour être dérivée.
"""
return False
# process_option() s'appelait auparavant is_option(). Cette méthode est
# gardée pour compatibilité
def is_option(self, option, value): return False
def process_args(self, args):
u"""Traiter les arguments après les options.
Cette méthode est appelée avant que soit initialisé le serveur web, et
est prévue pour être dérivée.
"""
def before_start(self):
u"""Effectuer un traitement avant de lancer le serveur web standalone.
Cette méthode est appelée juste avant que soit lancé le serveur web, et
est prévue pour être dérivée.
"""
if web.config.debug:
enote(u"Lancement du serveur sur http://%s:%i" % (self.HOST, self.PORT))
def new_render(self, templatedir=None):
u"""Retourner une nouvelle instance de web.template.render
"""
if templatedir is None: templatedir = self.templatedir
return web.template.render(templatedir,
cache=not self.DEBUG,
globals=self.template_globals)
def _new_application(self):
u"""Créer une nouvelle instance de web.application
"""
return web.application()
def _new_session(self):
u"""Si les sessions ne sont pas supportées, retourner None.
Sinon, retourner un tuple (store, initializer) où:
- store est une instance de web.session.Store, qui sert à stocker les
sessions.
- initializer est le dictionnaire initial, et peut être omis
Exemple:
return web.session.DiskStore('sessions'), {}
"""
return None
def _configure_handler_class(self, handler_class):
if getattr(handler_class, 'uio', None) is None:
handler_class.uio = self.uio
self.webapp.add_mapping(handler_class.PATH, handler_class)
def _configure_handler(self, handler):
handler.app = self
handler._render = self.render
handler.init()
def _wsgifunc(self):
u"""Obtenir la fonction pour servir les requêtes wsgi
"""
return self.webapp.wsgifunc()
def _start_server(self, server_type=None, server_socket=None):
u"""Démarrer le serveur web
"""
func = self._wsgifunc()
if os.environ.has_key('SERVER_SOFTWARE'): # cgi
os.environ['FCGI_FORCE_CGI'] = 'Y'
if (os.environ.has_key('PHP_FCGI_CHILDREN') #lighttpd fastcgi
or os.environ.has_key('SERVER_SOFTWARE')):
return web.wsgi.runfcgi(func, None)
if server_type in ('fcgi', 'fastcgi'):
return web.wsgi.runfcgi(func, server_socket or (self.HOST, self.PORT))
elif server_type in ('scgi', ):
return web.wsgi.runscgi(func, server_socket or (self.HOST, self.PORT))
else:
return web.httpserver.runsimple(func, (self.HOST, self.PORT))
def __configure(self):
web.config.debug = self.DEBUG
web.config.BASEDIR = self.basedir
self.webapp = self._new_application()
self.webapp.set_handler_configurator(self._configure_handler)
for handler_class in HANDLER_CLASSES:
self._configure_handler_class(handler_class)
session = None
tmp = self._new_session()
if tmp is not None:
tmp = seqof(tmp)
store = tmp[0:1] and tmp[0] or None
initializer = tmp[1:2] and tmp[1] or None
if store is not None:
session = web.config.get("_session", None)
if session is None:
session = web.session.Session(self.webapp, store, initializer=initializer)
web.config._session = session
self.session = session
tg = self.template_globals
if session is not None and not tg.has_key("session"):
tg["session"] = session
self.render = web.template.render(self.templatedir, cache=not self.DEBUG, globals=tg)
def wsgiapp(self):
u"""Retourner l'instance à utiliser pour mod_wsgi.
mod_wsgi s'attend à trouver une variable nommée application dans le
module spécifié dans la configuration. Il faut donc utiliser cette
méthode de cette façon:
class MyApp(Application):
...
application = MyApp().wsgiapp()
"""
self.__configure()
return self._wsgifunc()
def run(self, args=None, **ignored):
u"""Démarrer le serveur, en standalone, fastcgi ou scgi suivant les
arguments de la ligne de commande.
Utiliser cette méthode de cette façon:
class MyApp(Application):
...
if __name__ == '__main__':
import sys
MyApp().run(sys.argv[1:])
"""
if args is None: args = []
options, args = get_args(map(_u, args),
'S:s:H:P:D' + (self.OPTIONS or ''),
['server-type=', 'server-socket=', 'host=', 'port=', 'debug'] + list(self.LONG_OPTIONS or ()))
server_type = None
server_socket = None
for option, value in options:
if self.process_option(option, value): pass
elif self.is_option(option, value): pass #compatibilité
elif option in ('-S', '--server-type'): server_type = value
elif option in ('-s', '--server-socket'): server_socket = value
elif option in ('-H', '--host'): self.HOST = value
elif option in ('-P', '--port'): self.PORT = int(value)
elif option in ('-D', '--debug'): self.DEBUG = True
self.args = args
self.process_args(args)
self.__configure()
self.before_start()
self._start_server(server_type, server_socket)
def cgirun(self):
u"""Retourner un handler CGI. A utiliser avec Google AppEngine
Utiliser cette méthode de cette façon:
class MyApp(Application):
...
main = MyApp().cgirun()
if __name__ == '__main__':
main()
"""
self.__configure()
wsgifunc = self._wsgifunc()
try:
from google.appengine.ext.webapp.util import run_wsgi_app
return run_wsgi_app(wsgifunc)
except ImportError:
# we're not running from within Google App Engine
import wsgiref
return wsgiref.handlers.CGIHandler().run(wsgifunc)
# gestion des messages
def clear_msgs(self, level=None):
if level is not None:
self._msgs = [msg for msg in self._msgs if msg.level > level]
else: self._msgs = []
return self._msgs
def add_msg(self, msg_value, msg_class=None, level=None):
if msg_value is None and msg_class is None: return False
self._msgs.append(Msg(msg_value, msg_class, level))
return True
last_msg = property(lambda self: self._msgs and self._msgs[-1] or NO_MSG)
msg_value = property(lambda self: self.get_last_msg().msg_value)
msg_class = property(lambda self: self.get_last_msg().msg_class)