311 lines
12 KiB
Python
311 lines
12 KiB
Python
# -*- coding: utf-8 -*- vim:sw=4:sts=4:et:ai:si:sta:fenc=utf-8
|
|
|
|
"""Des fonctions pour l'affichage colorisé.
|
|
"""
|
|
|
|
__all__ = ('IN_ISATTY', 'OUT_ISATTY', 'ERR_ISATTY',
|
|
'RESET',
|
|
'BLACK', 'RED', 'GREEN', 'YELLOW', 'BLUE', 'MAGENTA', 'CYAN', 'WHITE', 'DEFAULT',
|
|
'BLACK_BG', 'RED_BG', 'GREEN_BG', 'YELLOW_BG', 'BLUE_BG', 'MAGENTA_BG', 'CYAN_BG', 'WHITE_BG', 'DEFAULT_BG',
|
|
'BOLD', 'FAINT', 'UNDERLINED', 'REVERSE', 'NORMAL',
|
|
'get_color', 'get_colored',
|
|
'V_SYSLOG', 'V_NONE', 'V_ERROR', 'V_WARNING', 'V_INFO', 'V_DEBUG',
|
|
'set_verbosity', 'check_verbosity', 'show_error', 'show_warn', 'show_info', 'show_debug',
|
|
'eprint', 'eerror', 'ewarn', 'enote', 'eimportant', 'eattention', 'einfo', 'eecho', 'eecho_', 'edebug',
|
|
'etitle', 'estep', 'estep_', 'ebegin', 'edot', 'eend',
|
|
)
|
|
|
|
import os, sys, re
|
|
try: import syslog
|
|
except: syslog = None
|
|
from traceback import format_exception
|
|
|
|
from .base import isnum, isseq, seqof, myname
|
|
from .uio import _u, _s, uprint
|
|
|
|
################################################################################
|
|
def is_debug(): return os.environ.get('DEBUG', '') != ''
|
|
|
|
################################################################################
|
|
# Couleurs
|
|
|
|
def _isatty(f):
|
|
try: return f.isatty()
|
|
except: return False
|
|
IN = sys.stdin
|
|
IN_ISATTY = _isatty(IN)
|
|
OUT = sys.stdout
|
|
OUT_ISATTY = _isatty(OUT)
|
|
ERR = sys.stderr
|
|
ERR_ISATTY = _isatty(ERR)
|
|
|
|
USE_COLORS = OUT_ISATTY and ERR_ISATTY and not os.environ.get('UTOOLS_NO_COLORS', '')
|
|
RESET = '0'
|
|
BLACK = '30'; RED = '31'; GREEN = '32'; YELLOW = '33'
|
|
BLUE = '34'; MAGENTA = '35'; CYAN = '36'; WHITE = '37'
|
|
DEFAULT = '39'
|
|
BLACK_BG = '40'; RED_BG = '41'; GREEN_BG = '42'; YELLOW_BG = '43'
|
|
BLUE_BG = '44'; MAGENTA_BG = '45'; CYAN_BG = '46'; WHITE_BG = '47'
|
|
DEFAULT_BG = '49'
|
|
BOLD = '1'; FAINT = '2'; UNDERLINED = '4'; REVERSE = '7'; NORMAL = '22'
|
|
|
|
RE_COMMA = re.compile(r'\s*,\s*')
|
|
RE_COLOR = re.compile(r'[0-9]+$')
|
|
LONG_COLOR_MAP = {'reset': RESET,
|
|
'black': BLACK, 'red': RED, 'green': GREEN, 'yellow': YELLOW, 'blue': BLUE, 'magenta': MAGENTA, 'cyan': CYAN, 'white': WHITE,
|
|
'black_bg': BLACK_BG, 'red_bg': RED_BG, 'green_bg': GREEN_BG, 'yellow_bg': YELLOW_BG, 'blue_bg': BLUE_BG, 'magenta_bg': MAGENTA_BG, 'cyan_bg': CYAN_BG, 'white_bg': WHITE_BG,
|
|
'bold': BOLD, 'faint': FAINT, 'underlined': UNDERLINED, 'reverse': REVERSE, 'normal': NORMAL,
|
|
}
|
|
COLOR_MAP = {'z': RESET,
|
|
'o': BLACK, 'r': RED, 'g': GREEN, 'y': YELLOW, 'b': BLUE, 'm': MAGENTA, 'c': CYAN, 'w': WHITE,
|
|
'O': BLACK_BG, 'R': RED_BG, 'G': GREEN_BG, 'Y': YELLOW_BG, 'B': BLUE_BG, 'M': MAGENTA_BG, 'C': CYAN_BG, 'W': WHITE_BG,
|
|
'@': BOLD, '-': FAINT, '_': UNDERLINED, '~': REVERSE, 'n': NORMAL,
|
|
}
|
|
|
|
def get_color(*colors):
|
|
if not USE_COLORS: return ''
|
|
if not colors: colors = (RESET,)
|
|
if len(colors) == 1:
|
|
color = colors[0]
|
|
if color is None: return ""
|
|
elif color.startswith("\033["): return color
|
|
elif color.find(",") != - 1: colors = RE_COMMA.split(color)
|
|
|
|
numcolors = []
|
|
for color in colors:
|
|
if RE_COLOR.match(color):
|
|
numcolors.append(color)
|
|
else:
|
|
lcolor = color.lower()
|
|
if LONG_COLOR_MAP.has_key(lcolor):
|
|
numcolors.append(LONG_COLOR_MAP[lcolor])
|
|
else:
|
|
for c in color:
|
|
if COLOR_MAP.has_key(c):
|
|
numcolors.append(COLOR_MAP[c])
|
|
return "\033[%sm" % ';'.join(numcolors)
|
|
|
|
def get_colored(s, *colors):
|
|
return get_color(*colors) + s + get_color(RESET)
|
|
|
|
################################################################################
|
|
# Niveau de 'verbosité', pour la trace et pour l'affichage
|
|
|
|
V_SYSLOG = -1
|
|
V_NONE = 0
|
|
V_ERROR = 1
|
|
V_WARNING = 2
|
|
V_INFO = 3
|
|
V_DEBUG = 4
|
|
|
|
V_MIN = V_NONE
|
|
V_DEF = V_INFO
|
|
V_MAX = V_DEBUG
|
|
|
|
def get_verbosity(): return int(os.environ.get('__verbosity', V_DEF))
|
|
def _set_verbosity(v): os.environ['__verbosity'] = str(v)
|
|
def inc_verbosity():
|
|
v = get_verbosity()
|
|
if v < V_MAX: _set_verbosity(v + 1)
|
|
def dec_verbosity():
|
|
v = get_verbosity()
|
|
if v > V_MIN: _set_verbosity(v - 1)
|
|
def set_verbosity(v):
|
|
if not isnum(v) and not v: v = V_DEF
|
|
elif v in ('-Q', '--very-quiet'): v = V_MIN
|
|
elif v in ('-q', '--quiet'): return dec_verbosity()
|
|
elif v in ('-v', '--verbose'): return inc_verbosity()
|
|
elif v in ('-c', '--default'): v = V_DEF
|
|
elif v in ('-D', '--debug'): v = V_MAX
|
|
else: return False
|
|
_set_verbosity(v)
|
|
return True
|
|
|
|
VERBOSITY_OPTS = ('-Q', '--very-quiet',
|
|
'-q', '--quiet',
|
|
'-v', '--verbose',
|
|
'-D', '--debug'
|
|
)
|
|
|
|
RE_NUM = re.compile(r'\d+$')
|
|
def check_verbosity(v=V_INFO):
|
|
if isnum(v) or RE_NUM.match(v) is not None: return get_verbosity() >= int(v)
|
|
elif v in ('-Q', '--very-quiet'): return get_verbosity() >= V_ERROR
|
|
elif v in ('-q', '--quiet'): return get_verbosity() >= V_WARNING
|
|
elif v in ('-c', '--default'): return get_verbosity() >= V_INFO
|
|
elif v in ('-v', '--verbose'): return get_verbosity() >= V_DEBUG
|
|
elif v in ('-D', '--debug'): return is_debug() or get_verbosity() >= V_DEBUG
|
|
else: return True
|
|
def show_error(): return check_verbosity(V_ERROR)
|
|
def show_warn(): return check_verbosity(V_WARNING)
|
|
def show_info(): return check_verbosity(V_INFO)
|
|
def show_debug(): return is_debug() or check_verbosity(V_DEBUG)
|
|
|
|
_set_verbosity(get_verbosity()) # configurer la verbosité par défaut
|
|
|
|
################################################################################
|
|
# Affichage des messages
|
|
|
|
def get_tlevel(): return os.environ.get('__tlevel', '')
|
|
def set_tlevel(l): os.environ['__tlevel'] = l
|
|
|
|
def eprint(msg, minlevel=V_INFO, nl=False, flush=None, out=None):
|
|
msgs = seqof(msg, [])
|
|
|
|
if get_verbosity() == V_SYSLOG and syslog is not None:
|
|
if not nl: return # ignorer les lignes non complètes pour syslog
|
|
msg = ''
|
|
for mc in msgs:
|
|
if isseq(mc):
|
|
m = mc[0:1] and mc[0] or ''
|
|
msg += _u(m)
|
|
else:
|
|
msg += _u(mc)
|
|
ntlevel = get_ntlevel()
|
|
if ntlevel > 0: msg = u'[%i] %s' % (ntlevel, msg)
|
|
syslog.openlog(myname)
|
|
try:
|
|
syslog.syslog({V_NONE: syslog.LOG_EMERG,
|
|
V_ERROR: syslog.LOG_ERR,
|
|
V_WARNING: syslog.LOG_WARNING,
|
|
V_INFO: syslog.LOG_INFO,
|
|
V_DEBUG: syslog.LOG_DEBUG,
|
|
}.get(minlevel, syslog.LOG_INFO),
|
|
_s(msg))
|
|
finally:
|
|
syslog.closelog()
|
|
elif check_verbosity(minlevel):
|
|
msg = ''
|
|
for mc in msgs:
|
|
if isseq(mc):
|
|
m = mc[0:1] and mc[0] or ''
|
|
c = mc[1:2] and mc[1] or ''
|
|
msg += get_color(c) + _u(m) + get_color(RESET)
|
|
else:
|
|
msg += _u(mc)
|
|
|
|
tlevel = get_tlevel()
|
|
if tlevel: msg = _u(tlevel) + msg
|
|
uprint(msg, nl, flush, out=out)
|
|
|
|
def __add_suffix(msg, cls, obj, tb, show_tb=False):
|
|
# si (cls, obj, tb) sont des informations sur une exception, ajouter la
|
|
# description de l'exception au message msg. Sinon, msg n'est pas modifié
|
|
# (et reste à la valeur None si c'était le cas)
|
|
nl = True
|
|
if obj:
|
|
if not msg: msg = _u(obj)
|
|
else: msg = _u(msg) + u': ' + _u(obj)
|
|
if tb is not None and (show_tb or show_debug()):
|
|
lines = format_exception(cls, obj, tb)
|
|
lines.insert(0, '\n')
|
|
if msg is None: msg = u''
|
|
msg += u''.join(map(_u, lines))
|
|
nl = False
|
|
return msg, nl
|
|
|
|
def eerror(msg=None, flush=None, show_tb=False):
|
|
nl = True
|
|
if msg is None or show_tb:
|
|
cls, obj, tb = sys.exc_info()
|
|
msg, nl = __add_suffix(msg, cls, obj, tb, show_tb)
|
|
eprint((('E ', 'r,@'), msg), V_ERROR, nl, flush, ERR)
|
|
def ewarn(msg, flush=None): eprint((('W ', 'y,@'), msg), V_WARNING, True, flush, ERR)
|
|
def enote(msg, flush=None): eprint((('N ', 'g,@'), msg), V_INFO, True, flush, ERR)
|
|
def eimportant(msg, flush=None): eprint((('! ', 'r,@'), msg), V_ERROR, True, flush, ERR)
|
|
def eattention(msg, flush=None): eprint((('* ', 'y,@'), msg), V_WARNING, True, flush, ERR)
|
|
def einfo(msg, flush=None): eprint((('I ', 'b,@'), msg), V_INFO, True, flush, ERR)
|
|
def eecho(msg, flush=None): eprint(msg, V_INFO, True, flush, ERR)
|
|
def eecho_(msg, flush=None): eprint(msg, V_INFO, False, flush, ERR)
|
|
def edebug(msg, flush=None): eprint((('D ', 'w,@'), msg), V_DEBUG, True, flush, ERR)
|
|
|
|
def get_ntlevel(): return len(get_tlevel()) / 2
|
|
def set_ntlevel(l): set_tlevel(l * ' ')
|
|
def inc_ntlevel():
|
|
if get_estack() != '': set_ntlevel(get_ntlevel() + 1)
|
|
def dec_ntlevel():
|
|
l = get_ntlevel()
|
|
if l > 0: set_ntlevel(l - 1)
|
|
|
|
def get_estack(): return os.environ.get('__estack', '')
|
|
def set_estack(s): os.environ['__estack'] = s
|
|
def add_estack(c): set_estack('%s:%s' % (get_estack(), c))
|
|
def check_estack(c): return get_estack()[-1:] == c
|
|
def rem_estack(c):
|
|
estack = get_estack()
|
|
if estack.endswith(c): set_estack(estack[:-2])
|
|
|
|
def _result(msg, func_or_result, *args, **kw):
|
|
r = None
|
|
exc_info = (None, None)
|
|
if callable(func_or_result):
|
|
func = func_or_result
|
|
try:
|
|
r = func(*args, **kw)
|
|
except:
|
|
r = False
|
|
cls, obj, tb = sys.exc_info()
|
|
msg, _ = __add_suffix(msg, cls, obj, tb)
|
|
del tb
|
|
exc_info = (cls, obj)
|
|
else:
|
|
r = func_or_result
|
|
if r is None: r = True
|
|
return r, msg, exc_info
|
|
def _raise(exc_info):
|
|
cls, obj = exc_info[:2]
|
|
if cls is None and obj is None: pass
|
|
elif isinstance(obj, cls): raise obj
|
|
else: raise cls(obj)
|
|
def etitle(title, func_or_result=None, *args, **kw):
|
|
estack = get_estack()
|
|
inc_ntlevel()
|
|
add_estack('t')
|
|
eprint((('T ', 'b,@'), (title, 'b,@,_')), V_INFO, True, True, ERR)
|
|
if func_or_result is not None: eend(func_or_result, *args, **kw)
|
|
def estep(msg, func_or_result=None, flush=None):
|
|
r, msg, exc_info = _result(msg, func_or_result)
|
|
eprint((('. ', 'w,@'), msg), V_INFO, True, flush, ERR)
|
|
_raise(exc_info)
|
|
def estep_(msg, func_or_result=None, flush=None):
|
|
r, msg, exc_info = _result(msg, func_or_result)
|
|
eprint((('. ', 'w,@'), msg), V_INFO, False, flush, ERR)
|
|
_raise(exc_info)
|
|
def ebegin(title, func_or_result=None, *args, **kw):
|
|
add_estack('b')
|
|
eprint((('. ', 'w,@'), title + ': '), V_INFO, False, True, ERR)
|
|
if func_or_result is not None: eend(func_or_result, *args, **kw)
|
|
def _edoto(): eprint('.', V_INFO, False, True, ERR)
|
|
def _edotx(): eprint((('x', 'r,@'),), V_INFO, False, True, ERR)
|
|
def _edotd(msg):
|
|
msg = list(seqof(msg, []))
|
|
msg.insert(0, '(')
|
|
msg.append(')')
|
|
eprint(msg, V_INFO, True, True, ERR)
|
|
def edot(msg=None, func_or_result=None, *args, **kw):
|
|
r, msg, exc_info = _result(msg, func_or_result, *args, **kw)
|
|
if not show_info(): return
|
|
if r: _edoto()
|
|
else: _edotx()
|
|
if msg and show_debug(): _edotd(msg)
|
|
_raise(exc_info)
|
|
def _eendo(): eprint((('[ok]', 'g,@'),), V_INFO, True, True, ERR)
|
|
def _eendx(): eprint((('[error]', 'r,@'),), V_INFO, True, True, ERR)
|
|
def eend(func_or_result=None, *args, **kw):
|
|
r, msg, exc_info = _result(None, func_or_result, *args, **kw)
|
|
if check_estack('b'):
|
|
# terminer une section commencée par ebegin
|
|
rem_estack('b')
|
|
if show_info():
|
|
if r: _eendo()
|
|
else: _eendx()
|
|
if msg and show_debug(): _edotd(msg)
|
|
elif check_estack('t'):
|
|
# terminer une section commencée par etitle
|
|
rem_estack('t')
|
|
dec_ntlevel()
|
|
_raise(exc_info)
|
|
def resets():
|
|
set_estack('')
|
|
set_ntlevel(0)
|