nutools/lib/nulib/python/nulib/lines.py

397 lines
13 KiB
Python

# -*- coding: utf-8 -*- vim:sw=4:sts=4:et:ai:si:sta:fenc=utf-8
"""Des fonctions pour gérer des lignes de texte.
"""
__all__ = ('BLines', 'Lines')
import os, re
from types import MethodType, FunctionType
from .base import isstr, isseq
from .uio import _s, _u, UnicodeIO, defaultIO
from .encdetect import detect_line_encoding, guess_line_encoding
from .functions import apply_args
class BLines(list):
"""Une liste de chaines de type str.
"""
_valid = False
def is_valid(self): return self._valid
valid = property(is_valid)
_nl = None
def get_nl(self): return self._nl
def set_nl(self, nl): self._nl = nl
nl = property(get_nl, set_nl)
def __strip_nl(self, line):
nl = ''
while line[-1:] in ('\r', '\n'):
nl = line[-1:] + nl
line = line[:-1]
if self._nl is None and nl:
self._nl = nl
return line
def __add_nl(self, line):
if self._nl is None: self._nl = os.linesep
return line + self._nl
def __init__(self, lines=None, nl=None):
if nl is not None: self._nl = nl
if lines is None:
pass
elif isseq(lines):
self[:] = map(self.__strip_nl, lines)
self._valid = True
if self._nl is None: self._nl = os.linesep
elif isstr(lines):
self.appendtext(lines)
if self._nl is None: self._nl = os.linesep
else:
raise ValueError("Unsupported type: %s" % type(lines))
def reset(self):
"""Vider toutes les lignes de cet objet.
"""
self[:] = []
self._valid = False
self._nl = None
def _new(self, list=None):
u"""Retourner une instance vide de cette classe, avec les mêmes paramètres pour nl.
"""
return self.__class__(list, self._nl)
def copy(self):
"""Faire et retourner une copie de cet objet.
"""
return self._new(self)
RE_NLs = re.compile(r'(\r?\n|\r)')
def appendtext(self, text, strip_last_nl=False):
"""Ajouter un texte éventuellement sur plusieurs lignes.
Si strip_last_nl, ignorer la dernière ligne vide.
Equivalent en gros à self.extend(test.split(r'\r?\n|\r'))
@return: self
"""
lines = self.RE_NLs.split(text)
if strip_last_nl and not lines[-1]: del lines[-1]
i, max = 0, len(lines) - 1
while i <= max:
if i == max:
line = lines[i]
i += 1
else:
line = lines[i] + lines[i + 1]
i += 2
self.append(self.__strip_nl(line))
return self
def _after_readlines(self, lines, uio):
return lines
def readlines(self, inf, raise_exception=True, uio=None, open_func=None, until=None):
"""Lire les lignes du fichier.
@param until: fonction qui si elle retourne True, arrête la lecture. La
ligne qui arrête la lecture est incluse dans la liste. La signature
de la fonction est until(line[, lines]).
@return: self
"""
self.reset()
close_inf = False
if isstr(inf):
if open_func is None: open_func = open
inf = open_func(inf, 'rb')
close_inf = True
try:
try:
if until:
lines = []
while True:
line = inf.readline()
if not line: break
line = self.__strip_nl(line)
lines.append(line)
if apply_args(until, line, lines): break
else:
lines = map(self.__strip_nl, inf.readlines())
lines = self._after_readlines(lines, uio)
self.extend(lines)
self._valid = True
except IOError:
if raise_exception: raise
finally:
if close_inf: inf.close()
if self._nl is None: self._nl = os.linesep
return self
def _before_writelines(self, lines, uio):
if uio is None: uio = defaultIO
return map(uio.s, lines)
def writelines(self, outf, uio=None, open_func=None):
"""Ecrire les lignes dans le fichier
"""
close_outf = False
if isstr(outf):
if open_func is None: open_func = open
outf = open_func(outf, 'wb')
close_outf = True
try:
lines = self._before_writelines(self, uio)
outf.writelines(map(self.__add_nl, lines))
finally:
if close_outf: outf.close()
def _compile(self, forp):
if isstr(forp): forp = re.compile(forp)
return forp
def __apply(self, func, line, index, lines, args):
f = func
ac_offset = 0
if type(f) is MethodType:
f = f.im_func
ac_offset = 1
if type(f) is not FunctionType:
raise ValueError("func must be a function")
argcount = f.func_code.co_argcount - ac_offset
args = (line, index, lines, args)[:argcount]
return func(*args)
def _matches(self, forp, line, index=None, lines=None, args=None):
if callable(forp): return self.__apply(forp, line, index, lines, args)
else: return forp.match(line) is not None
def grepi(self, forp, indexes=None, inverse=False, boundaries=False, **args):
"""forp étant une fonction ou une expression régulière, retourner une
liste d'index de lignes pour lesquelles la fonction retourne True ou
qui correspondent à l'expression régulière forp.
Si inverse==True, la logique est inversée (on ne retourne que les
indexes des lignes qui ne matches pas.)
Si indexes n'est pas None, la recherche se limite aux lignes dont
l'index est dans cette liste. Sinon, si boundaries==True, ne rechercher
qu'au début et à la fin de la liste (i.e. s'arrêter de part et d'autre
dès que la fonction retourne False ou que le pattern ne matche pas.)
@return: une liste d'indexes
@rtype: list
"""
if indexes is None:
indexes = range(len(self))
else:
# ne pas chercher sur les boundaries si on donne indexes
boundaries = False
forp = self._compile(forp)
min = 0
matches = []
for index in indexes:
line = self[index]
found = self._matches(forp, line, index, self, args)
if inverse: found = not found
if found:
matches.append(index)
elif boundaries:
min = index
break
if boundaries:
index = len(self) - 1
pos = len(matches)
while index > min:
line = self[index]
found = self._matches(forp, line, index, self, args)
if inverse: found = not found
if found:
matches.insert(pos, index)
index -= 1
else:
break
return matches
def grep(self, forp, indexes=None, inverse=False, boundaries=False, **args):
"""forp étant une fonction ou une expression régulière, retourner les
lignes pour lesquelles la fonction retourne True ou qui correspondent
à l'expression régulière forp.
Consulter l'aide de la fonction grepi pour des détails sur les
paramètres.
@return: une liste de chaines
@rtype: list
"""
indexes = self.grepi(forp, indexes, inverse, boundaries, **args)
return self._new(map(lambda i: self[i], indexes))
def replace(self, pattern, repl, count=0, indexes=None):
"""Remplacer dans toute les lignes dont l'index est dans la liste
indexes (toutes les lignes par défaut), l'expression régulière pattern
par repl.
@return: le nombre de remplacements effectués
@rtype: int
"""
if indexes is None: indexes = range(len(self))
pattern = self._compile(pattern)
nbrepl = 0
for index in indexes:
line = self[index]
line, nb = pattern.subn(repl, line, count)
self[index] = line
nbrepl = nbrepl + nb
return nbrepl
def map(self, func, copy=False, **args):
"""Pour chacune des lignes, appeler la fonction func avec les arguments
suivants dans cet ordre:
line = la ligne
index = l'index dans la liste
lines = cet objet (ou sa copie)
args = le dictionnaire des paramètres supplémentaires
Si copy==True, opérer sur une copie de cet objet, sinon les lignes sont
modifiées en place.
@return: la liste résultat
@rtype: list
"""
if copy: lines = self.copy()
else: lines = self
for index in range(len(lines)):
lines[index] = self.__apply(func, lines[index], index, lines, args)
return lines
def filter(self, forp, copy=False, inverse=False, boundaries=False, **args):
"""Pour chacune des lignes, essayer de matcher avec l'expression
régulière forp, ou appeler la fonction forp avec les arguments
suivants dans cet ordre:
line = la ligne
index = l'index dans la liste
lines = cet objet (ou sa copie)
args = le dictionnaire des paramètres supplémentaires
Si l'expression régulière matche, ou si la fonction retourne True,
la valeur est gardée, sinon elle est supprimée. Cette logique est
inversée si inverse==True.
Si copy==True, opérer sur une copie de cet objet, sinon les lignes sont
modifiées en place. Si boundaries==True, n'opérer qu'au début et à la
fin de la liste (i.e. s'arrêter de part et d'autre dès que la fonction
retourne True.)
@return: la liste résultat
@rtype: list
"""
if copy: lines = self.copy()
else: lines = self
forp = self._compile(forp)
index = 0
min = 0
max = len(lines)
while index < max:
line = lines[index]
match = self._matches(forp, line, index, lines, args)
if inverse: match = not match
if match:
if boundaries:
min = index
break
else:
index += 1
else:
del lines[index]
max -= 1
if boundaries:
index = len(lines) - 1
while index > min:
line = lines[index]
match = self._matches(forp, line, index, lines, args)
if inverse: match = not match
if match:
break
else:
del lines[index]
index -= 1
return lines
def join(self, lastnl=False):
"""Obtenir les lignes jointes avec self.nl
Si lastnl==True, la dernière ligne se termine par nl. Cela est
approprié quand on veut par exemple générer un fichier.
"""
if lastnl:
return ''.join(map(self.__add_nl, self))
else:
nl = self._nl
if nl is None: nl = os.linesep
return nl.join(self)
class Lines(BLines):
"""Une liste de chaines de type unicode.
"""
__use_defaultIO = []
__dont_convert = []
def detect_encoding(lines):
encoding = detect_line_encoding(lines)
if encoding is None:
encoding = guess_line_encoding(lines,
Lines.__use_defaultIO,
Lines.__dont_convert)
return encoding
detect_encoding = staticmethod(detect_encoding)
_uio = None
def get_uio(self): return self._uio
def set_uio(self, uio): self._uio = uio
uio = property(get_uio, set_uio)
_encoding_detector = detect_encoding
def get_encoding_detector(self): return self._encoding_detector
def set_encoding_detector(self, encoding_detector): self._encoding_detector = encoding_detector
encoding_detector = property(get_encoding_detector, set_encoding_detector)
def __init__(self, lines=None, nl=None, uio=None, encoding_detector=None):
if uio is not None: self._uio = uio
if encoding_detector is not None:
self._encoding_detector = encoding_detector
BLines.__init__(self, lines, nl)
def _after_readlines(self, lines, uio):
set_uio = uio is None
if uio is None: uio = self._uio
if uio is None:
encoding_detector = self._encoding_detector
if encoding_detector is not None:
encoding = encoding_detector(lines)
if encoding is self.__use_defaultIO:
uio = defaultIO
if set_uio: self._uio = uio
elif encoding is self.__dont_convert:
pass
else:
uio = UnicodeIO(encoding)
if set_uio: self._uio = uio
if uio is not None:
lines = map(uio.u, lines)
return lines
def _before_writelines(self, lines, uio):
if uio is None: uio = self._uio
if uio is None: uio = defaultIO
return map(uio.s, lines)