# -*- coding: utf-8 -*- vim:sw=4:sts=4:et:ai:si:sta:fenc=utf-8 import i_need_py23 """Des fonctions pour l'affichage colorisé. """ __all__ = ('IN_ISATTY', 'OUT_ISATTY', 'ERR_ISATTY', 'RESET', 'BLACK', 'RED', 'GREEN', 'YELLOW', 'BLUE', 'MAGENTA', 'CYAN', 'WHITE', 'DEFAULT', 'BLACK_BG', 'RED_BG', 'GREEN_BG', 'YELLOW_BG', 'BLUE_BG', 'MAGENTA_BG', 'CYAN_BG', 'WHITE_BG', 'DEFAULT_BG', 'BOLD', 'FAINT', 'UNDERLINED', 'REVERSE', 'NORMAL', 'get_color', 'get_colored', 'V_SYSLOG', 'V_NONE', 'V_ERROR', 'V_WARNING', 'V_INFO', 'V_DEBUG', 'set_verbosity', 'check_verbosity', 'show_error', 'show_warn', 'show_info', 'show_debug', 'eprint', 'eerror', 'ewarn', 'enote', 'eimportant', 'eattention', 'einfo', 'eecho', 'eecho_', 'edebug', 'etitle', 'estep', 'estep_', 'ebegin', 'edot', 'eend', ) import os, sys, re try: import syslog except: syslog = None from traceback import format_exception from base import isnum, isseq, seqof, scriptname from uio import _u, _s, uprint ################################################################################ def is_debug(): return os.environ.get('DEBUG', '') != '' ################################################################################ # Couleurs def _isatty(f): try: return f.isatty() except: return False IN = sys.stdin IN_ISATTY = _isatty(IN) OUT = sys.stdout OUT_ISATTY = _isatty(OUT) ERR = sys.stderr ERR_ISATTY = _isatty(ERR) USE_COLORS = OUT_ISATTY and ERR_ISATTY RESET = '0' BLACK = '30'; RED = '31'; GREEN = '32'; YELLOW = '33' BLUE = '34'; MAGENTA = '35'; CYAN = '36'; WHITE = '37' DEFAULT = '39' BLACK_BG = '40'; RED_BG = '41'; GREEN_BG = '42'; YELLOW_BG = '43' BLUE_BG = '44'; MAGENTA_BG = '45'; CYAN_BG = '46'; WHITE_BG = '47' DEFAULT_BG = '49' BOLD = '1'; FAINT = '2'; UNDERLINED = '4'; REVERSE = '7'; NORMAL = '22' RE_COMMA = re.compile(r'\s*,\s*') RE_COLOR = re.compile(r'[0-9]+$') LONG_COLOR_MAP = {'reset': RESET, 'black': BLACK, 'red': RED, 'green': GREEN, 'yellow': YELLOW, 'blue': BLUE, 'magenta': MAGENTA, 'cyan': CYAN, 'white': WHITE, 'black_bg': BLACK_BG, 'red_bg': RED_BG, 'green_bg': GREEN_BG, 'yellow_bg': YELLOW_BG, 'blue_bg': BLUE_BG, 'magenta_bg': MAGENTA_BG, 'cyan_bg': CYAN_BG, 'white_bg': WHITE_BG, 'bold': BOLD, 'faint': FAINT, 'underlined': UNDERLINED, 'reverse': REVERSE, 'normal': NORMAL, } COLOR_MAP = {'z': RESET, 'o': BLACK, 'r': RED, 'g': GREEN, 'y': YELLOW, 'b': BLUE, 'm': MAGENTA, 'c': CYAN, 'w': WHITE, 'O': BLACK_BG, 'R': RED_BG, 'G': GREEN_BG, 'Y': YELLOW_BG, 'B': BLUE_BG, 'M': MAGENTA_BG, 'C': CYAN_BG, 'W': WHITE_BG, '@': BOLD, '-': FAINT, '_': UNDERLINED, '~': REVERSE, 'n': NORMAL, } def get_color(*colors): if not USE_COLORS: return '' if not colors: colors = (RESET,) if len(colors) == 1: color = colors[0] if color is None: return "" elif color.startswith("\033["): return color elif color.find(",") != - 1: colors = RE_COMMA.split(color) numcolors = [] for color in colors: if RE_COLOR.match(color): numcolors.append(color) else: lcolor = color.lower() if LONG_COLOR_MAP.has_key(lcolor): numcolors.append(LONG_COLOR_MAP[lcolor]) else: for c in color: if COLOR_MAP.has_key(c): numcolors.append(COLOR_MAP[c]) return "\033[%sm" % ';'.join(numcolors) def get_colored(s, *colors): return get_color(*colors) + s + get_color(RESET) ################################################################################ # Niveau de 'verbosité', pour la trace et pour l'affichage V_SYSLOG = -1 V_NONE = 0 V_ERROR = 1 V_WARNING = 2 V_INFO = 3 V_DEBUG = 4 V_MIN = V_NONE V_DEF = V_INFO V_MAX = V_DEBUG def get_verbosity(): return int(os.environ.get('__verbosity', V_DEF)) def _set_verbosity(v): os.environ['__verbosity'] = str(v) def inc_verbosity(): v = get_verbosity() if v < V_MAX: _set_verbosity(v + 1) def dec_verbosity(): v = get_verbosity() if v > V_MIN: _set_verbosity(v - 1) def set_verbosity(v): if not isnum(v) and not v: v = V_DEF elif v in ('-Q', '--very-quiet'): v = V_MIN elif v in ('-q', '--quiet'): return dec_verbosity() elif v in ('-v', '--verbose'): return inc_verbosity() elif v in ('-c', '--default'): v = V_DEF elif v in ('-D', '--debug'): v = V_MAX else: return False _set_verbosity(v) return True VERBOSITY_OPTS = ('-Q', '--very-quiet', '-q', '--quiet', '-v', '--verbose', '-D', '--debug' ) RE_NUM = re.compile(r'\d+$') def check_verbosity(v=V_INFO): if isnum(v) or RE_NUM.match(v) is not None: return get_verbosity() >= int(v) elif v in ('-Q', '--very-quiet'): return get_verbosity() >= V_ERROR elif v in ('-q', '--quiet'): return get_verbosity() >= V_WARNING elif v in ('-c', '--default'): return get_verbosity() >= V_INFO elif v in ('-v', '--verbose'): return get_verbosity() >= V_DEBUG elif v in ('-D', '--debug'): return is_debug() or get_verbosity() >= V_DEBUG else: return True def show_error(): return check_verbosity(V_ERROR) def show_warn(): return check_verbosity(V_WARNING) def show_info(): return check_verbosity(V_INFO) def show_debug(): return is_debug() or check_verbosity(V_DEBUG) _set_verbosity(get_verbosity()) # configurer la verbosité par défaut ################################################################################ # Affichage des messages def get_tlevel(): return os.environ.get('__tlevel', '') def set_tlevel(l): os.environ['__tlevel'] = l def eprint(msg, minlevel=V_INFO, nl=False, flush=None, out=None): msgs = seqof(msg, []) if get_verbosity() == V_SYSLOG and syslog is not None: if not nl: return # ignorer les lignes non complètes pour syslog msg = '' for mc in msgs: if isseq(mc): m = mc[0:1] and mc[0] or '' msg += _u(m) else: msg += _u(mc) ntlevel = get_ntlevel() if ntlevel > 0: msg = u'[%i] %s' % (ntlevel, msg) syslog.openlog(scriptname) try: syslog.syslog({V_NONE: syslog.LOG_EMERG, V_ERROR: syslog.LOG_ERR, V_WARNING: syslog.LOG_WARNING, V_INFO: syslog.LOG_INFO, V_DEBUG: syslog.LOG_DEBUG, }.get(minlevel, syslog.LOG_INFO), _s(msg)) finally: syslog.closelog() elif check_verbosity(minlevel): msg = '' for mc in msgs: if isseq(mc): m = mc[0:1] and mc[0] or '' c = mc[1:2] and mc[1] or '' msg += get_color(c) + _u(m) + get_color(RESET) else: msg += _u(mc) tlevel = get_tlevel() if tlevel: msg = _u(tlevel) + msg uprint(msg, nl, flush, out=out) def __add_suffix(msg, cls, obj, tb, show_tb=False): # si (cls, obj, tb) sont des informations sur une exception, ajouter la # description de l'exception au message msg. Sinon, msg n'est pas modifié # (et reste à la valeur None si c'était le cas) nl = True if obj: if not msg: msg = _u(obj) else: msg = _u(msg) + u': ' + _u(obj) if tb is not None and (show_tb or show_debug()): lines = format_exception(cls, obj, tb) lines.insert(0, '\n') if msg is None: msg = u'' msg += u''.join(map(_u, lines)) nl = False return msg, nl def eerror(msg=None, flush=None, show_tb=False): nl = True if msg is None or show_tb: cls, obj, tb = sys.exc_info() msg, nl = __add_suffix(msg, cls, obj, tb, show_tb) eprint((('E ', 'r,@'), msg), V_ERROR, nl, flush, ERR) def ewarn(msg, flush=None): eprint((('W ', 'y,@'), msg), V_WARNING, True, flush, ERR) def enote(msg, flush=None): eprint((('N ', 'g,@'), msg), V_INFO, True, flush, ERR) def eimportant(msg, flush=None): eprint((('! ', 'r,@'), msg), V_ERROR, True, flush, ERR) def eattention(msg, flush=None): eprint((('* ', 'y,@'), msg), V_WARNING, True, flush, ERR) def einfo(msg, flush=None): eprint((('I ', 'b,@'), msg), V_INFO, True, flush, ERR) def eecho(msg, flush=None): eprint(msg, V_INFO, True, flush, ERR) def eecho_(msg, flush=None): eprint(msg, V_INFO, False, flush, ERR) def edebug(msg, flush=None): eprint((('D ', 'w,@'), msg), V_DEBUG, True, flush, ERR) def get_ntlevel(): return len(get_tlevel()) / 2 def set_ntlevel(l): set_tlevel(l * ' ') def inc_ntlevel(): if get_estack() != '': set_ntlevel(get_ntlevel() + 1) def dec_ntlevel(): l = get_ntlevel() if l > 0: set_ntlevel(l - 1) def get_estack(): return os.environ.get('__estack', '') def set_estack(s): os.environ['__estack'] = s def add_estack(c): set_estack('%s:%s' % (get_estack(), c)) def check_estack(c): return get_estack()[-1:] == c def rem_estack(c): estack = get_estack() if estack.endswith(c): set_estack(estack[:-2]) def _result(msg, func_or_result, *args, **kw): r = None exc_info = (None, None) if callable(func_or_result): func = func_or_result try: r = func(*args, **kw) except: r = False cls, obj, tb = sys.exc_info() msg, _ = __add_suffix(msg, cls, obj, tb) del tb exc_info = (cls, obj) else: r = func_or_result if r is None: r = True return r, msg, exc_info def _raise(exc_info): cls, obj = exc_info[:2] if cls is None and obj is None: pass elif isinstance(obj, cls): raise obj else: raise cls(obj) def etitle(title, func_or_result=None, *args, **kw): estack = get_estack() inc_ntlevel() add_estack('t') eprint((('T ', 'b,@'), (title, 'b,@,_')), V_INFO, True, True, ERR) if func_or_result is not None: eend(func_or_result, *args, **kw) def estep(msg, func_or_result=None, flush=None): r, msg, exc_info = _result(msg, func_or_result) eprint((('. ', 'w,@'), msg), V_INFO, True, flush, ERR) _raise(exc_info) def estep_(msg, func_or_result=None, flush=None): r, msg, exc_info = _result(msg, func_or_result) eprint((('. ', 'w,@'), msg), V_INFO, False, flush, ERR) _raise(exc_info) def ebegin(title, func_or_result=None, *args, **kw): add_estack('b') eprint((('. ', 'w,@'), title + ': '), V_INFO, False, True, ERR) if func_or_result is not None: eend(func_or_result, *args, **kw) def _edoto(): eprint('.', V_INFO, False, True, ERR) def _edotx(): eprint((('x', 'r,@'),), V_INFO, False, True, ERR) def _edotd(msg): msg = list(seqof(msg, [])) msg.insert(0, '(') msg.append(')') eprint(msg, V_INFO, True, True, ERR) def edot(msg=None, func_or_result=None, *args, **kw): r, msg, exc_info = _result(msg, func_or_result, *args, **kw) if not show_info(): return if r: _edoto() else: _edotx() if msg and show_debug(): _edotd(msg) _raise(exc_info) def _eendo(): eprint((('[ok]', 'g,@'),), V_INFO, True, True, ERR) def _eendx(): eprint((('[error]', 'r,@'),), V_INFO, True, True, ERR) def eend(func_or_result=None, *args, **kw): r, msg, exc_info = _result(None, func_or_result, *args, **kw) if check_estack('b'): # terminer une section commencée par ebegin rem_estack('b') if show_info(): if r: _eendo() else: _eendx() if msg and show_debug(): _edotd(msg) elif check_estack('t'): # terminer une section commencée par etitle rem_estack('t') dec_ntlevel() _raise(exc_info) def resets(): set_estack('') set_ntlevel(0)