397 lines
13 KiB
Python
397 lines
13 KiB
Python
# -*- coding: utf-8 -*- vim:sw=4:sts=4:et:ai:si:sta:fenc=utf-8
|
|
|
|
"""Des fonctions pour gérer des lignes de texte.
|
|
"""
|
|
|
|
__all__ = ('BLines', 'Lines')
|
|
|
|
import os, re
|
|
from types import MethodType, FunctionType
|
|
|
|
from .base import isstr, isseq
|
|
from .uio import _s, _u, UnicodeIO, defaultIO
|
|
from .encdetect import detect_line_encoding, guess_line_encoding
|
|
from .functions import apply_args
|
|
|
|
class BLines(list):
|
|
"""Une liste de chaines de type str.
|
|
"""
|
|
_valid = False
|
|
def is_valid(self): return self._valid
|
|
valid = property(is_valid)
|
|
|
|
_nl = None
|
|
def get_nl(self): return self._nl
|
|
def set_nl(self, nl): self._nl = nl
|
|
nl = property(get_nl, set_nl)
|
|
|
|
def __strip_nl(self, line):
|
|
nl = ''
|
|
while line[-1:] in ('\r', '\n'):
|
|
nl = line[-1:] + nl
|
|
line = line[:-1]
|
|
if self._nl is None and nl:
|
|
self._nl = nl
|
|
return line
|
|
|
|
def __add_nl(self, line):
|
|
if self._nl is None: self._nl = os.linesep
|
|
return line + self._nl
|
|
|
|
def __init__(self, lines=None, nl=None):
|
|
if nl is not None: self._nl = nl
|
|
if lines is None:
|
|
pass
|
|
elif isseq(lines):
|
|
self[:] = map(self.__strip_nl, lines)
|
|
self._valid = True
|
|
if self._nl is None: self._nl = os.linesep
|
|
elif isstr(lines):
|
|
self.appendtext(lines)
|
|
if self._nl is None: self._nl = os.linesep
|
|
else:
|
|
raise ValueError("Unsupported type: %s" % type(lines))
|
|
|
|
def reset(self):
|
|
"""Vider toutes les lignes de cet objet.
|
|
"""
|
|
self[:] = []
|
|
self._valid = False
|
|
self._nl = None
|
|
|
|
def _new(self, list=None):
|
|
u"""Retourner une instance vide de cette classe, avec les mêmes paramètres pour nl.
|
|
"""
|
|
return self.__class__(list, self._nl)
|
|
|
|
def copy(self):
|
|
"""Faire et retourner une copie de cet objet.
|
|
"""
|
|
return self._new(self)
|
|
|
|
RE_NLs = re.compile(r'(\r?\n|\r)')
|
|
def appendtext(self, text, strip_last_nl=False):
|
|
"""Ajouter un texte éventuellement sur plusieurs lignes.
|
|
|
|
Si strip_last_nl, ignorer la dernière ligne vide.
|
|
|
|
Equivalent en gros à self.extend(test.split(r'\r?\n|\r'))
|
|
|
|
@return: self
|
|
"""
|
|
lines = self.RE_NLs.split(text)
|
|
if strip_last_nl and not lines[-1]: del lines[-1]
|
|
i, max = 0, len(lines) - 1
|
|
while i <= max:
|
|
if i == max:
|
|
line = lines[i]
|
|
i += 1
|
|
else:
|
|
line = lines[i] + lines[i + 1]
|
|
i += 2
|
|
self.append(self.__strip_nl(line))
|
|
return self
|
|
|
|
def _after_readlines(self, lines, uio):
|
|
return lines
|
|
|
|
def readlines(self, inf, raise_exception=True, uio=None, open_func=None, until=None):
|
|
"""Lire les lignes du fichier.
|
|
|
|
@param until: fonction qui si elle retourne True, arrête la lecture. La
|
|
ligne qui arrête la lecture est incluse dans la liste. La signature
|
|
de la fonction est until(line[, lines]).
|
|
@return: self
|
|
"""
|
|
self.reset()
|
|
close_inf = False
|
|
if isstr(inf):
|
|
if open_func is None: open_func = open
|
|
inf = open_func(inf, 'rb')
|
|
close_inf = True
|
|
try:
|
|
try:
|
|
if until:
|
|
lines = []
|
|
while True:
|
|
line = inf.readline()
|
|
if not line: break
|
|
line = self.__strip_nl(line)
|
|
lines.append(line)
|
|
if apply_args(until, line, lines): break
|
|
else:
|
|
lines = map(self.__strip_nl, inf.readlines())
|
|
lines = self._after_readlines(lines, uio)
|
|
self.extend(lines)
|
|
self._valid = True
|
|
except IOError:
|
|
if raise_exception: raise
|
|
finally:
|
|
if close_inf: inf.close()
|
|
if self._nl is None: self._nl = os.linesep
|
|
return self
|
|
|
|
def _before_writelines(self, lines, uio):
|
|
if uio is None: uio = defaultIO
|
|
return map(uio.s, lines)
|
|
|
|
def writelines(self, outf, uio=None, open_func=None):
|
|
"""Ecrire les lignes dans le fichier
|
|
"""
|
|
close_outf = False
|
|
if isstr(outf):
|
|
if open_func is None: open_func = open
|
|
outf = open_func(outf, 'wb')
|
|
close_outf = True
|
|
try:
|
|
lines = self._before_writelines(self, uio)
|
|
outf.writelines(map(self.__add_nl, lines))
|
|
finally:
|
|
if close_outf: outf.close()
|
|
|
|
def _compile(self, forp):
|
|
if isstr(forp): forp = re.compile(forp)
|
|
return forp
|
|
|
|
def __apply(self, func, line, index, lines, args):
|
|
f = func
|
|
ac_offset = 0
|
|
if type(f) is MethodType:
|
|
f = f.im_func
|
|
ac_offset = 1
|
|
if type(f) is not FunctionType:
|
|
raise ValueError("func must be a function")
|
|
argcount = f.func_code.co_argcount - ac_offset
|
|
args = (line, index, lines, args)[:argcount]
|
|
return func(*args)
|
|
|
|
def _matches(self, forp, line, index=None, lines=None, args=None):
|
|
if callable(forp): return self.__apply(forp, line, index, lines, args)
|
|
else: return forp.match(line) is not None
|
|
|
|
def grepi(self, forp, indexes=None, inverse=False, boundaries=False, **args):
|
|
"""forp étant une fonction ou une expression régulière, retourner une
|
|
liste d'index de lignes pour lesquelles la fonction retourne True ou
|
|
qui correspondent à l'expression régulière forp.
|
|
|
|
Si inverse==True, la logique est inversée (on ne retourne que les
|
|
indexes des lignes qui ne matches pas.)
|
|
|
|
Si indexes n'est pas None, la recherche se limite aux lignes dont
|
|
l'index est dans cette liste. Sinon, si boundaries==True, ne rechercher
|
|
qu'au début et à la fin de la liste (i.e. s'arrêter de part et d'autre
|
|
dès que la fonction retourne False ou que le pattern ne matche pas.)
|
|
|
|
@return: une liste d'indexes
|
|
@rtype: list
|
|
"""
|
|
if indexes is None:
|
|
indexes = range(len(self))
|
|
else:
|
|
# ne pas chercher sur les boundaries si on donne indexes
|
|
boundaries = False
|
|
|
|
forp = self._compile(forp)
|
|
min = 0
|
|
matches = []
|
|
for index in indexes:
|
|
line = self[index]
|
|
found = self._matches(forp, line, index, self, args)
|
|
if inverse: found = not found
|
|
if found:
|
|
matches.append(index)
|
|
elif boundaries:
|
|
min = index
|
|
break
|
|
if boundaries:
|
|
index = len(self) - 1
|
|
pos = len(matches)
|
|
while index > min:
|
|
line = self[index]
|
|
found = self._matches(forp, line, index, self, args)
|
|
if inverse: found = not found
|
|
if found:
|
|
matches.insert(pos, index)
|
|
index -= 1
|
|
else:
|
|
break
|
|
return matches
|
|
|
|
def grep(self, forp, indexes=None, inverse=False, boundaries=False, **args):
|
|
"""forp étant une fonction ou une expression régulière, retourner les
|
|
lignes pour lesquelles la fonction retourne True ou qui correspondent
|
|
à l'expression régulière forp.
|
|
|
|
Consulter l'aide de la fonction grepi pour des détails sur les
|
|
paramètres.
|
|
|
|
@return: une liste de chaines
|
|
@rtype: list
|
|
"""
|
|
indexes = self.grepi(forp, indexes, inverse, boundaries, **args)
|
|
return self._new(map(lambda i: self[i], indexes))
|
|
|
|
def replace(self, pattern, repl, count=0, indexes=None):
|
|
"""Remplacer dans toute les lignes dont l'index est dans la liste
|
|
indexes (toutes les lignes par défaut), l'expression régulière pattern
|
|
par repl.
|
|
|
|
@return: le nombre de remplacements effectués
|
|
@rtype: int
|
|
"""
|
|
if indexes is None: indexes = range(len(self))
|
|
|
|
pattern = self._compile(pattern)
|
|
nbrepl = 0
|
|
for index in indexes:
|
|
line = self[index]
|
|
line, nb = pattern.subn(repl, line, count)
|
|
self[index] = line
|
|
nbrepl = nbrepl + nb
|
|
|
|
return nbrepl
|
|
|
|
def map(self, func, copy=False, **args):
|
|
"""Pour chacune des lignes, appeler la fonction func avec les arguments
|
|
suivants dans cet ordre:
|
|
|
|
line = la ligne
|
|
index = l'index dans la liste
|
|
lines = cet objet (ou sa copie)
|
|
args = le dictionnaire des paramètres supplémentaires
|
|
|
|
Si copy==True, opérer sur une copie de cet objet, sinon les lignes sont
|
|
modifiées en place.
|
|
|
|
@return: la liste résultat
|
|
@rtype: list
|
|
"""
|
|
if copy: lines = self.copy()
|
|
else: lines = self
|
|
|
|
for index in range(len(lines)):
|
|
lines[index] = self.__apply(func, lines[index], index, lines, args)
|
|
return lines
|
|
|
|
def filter(self, forp, copy=False, inverse=False, boundaries=False, **args):
|
|
"""Pour chacune des lignes, essayer de matcher avec l'expression
|
|
régulière forp, ou appeler la fonction forp avec les arguments
|
|
suivants dans cet ordre:
|
|
|
|
line = la ligne
|
|
index = l'index dans la liste
|
|
lines = cet objet (ou sa copie)
|
|
args = le dictionnaire des paramètres supplémentaires
|
|
|
|
Si l'expression régulière matche, ou si la fonction retourne True,
|
|
la valeur est gardée, sinon elle est supprimée. Cette logique est
|
|
inversée si inverse==True.
|
|
|
|
Si copy==True, opérer sur une copie de cet objet, sinon les lignes sont
|
|
modifiées en place. Si boundaries==True, n'opérer qu'au début et à la
|
|
fin de la liste (i.e. s'arrêter de part et d'autre dès que la fonction
|
|
retourne True.)
|
|
|
|
@return: la liste résultat
|
|
@rtype: list
|
|
"""
|
|
if copy: lines = self.copy()
|
|
else: lines = self
|
|
|
|
forp = self._compile(forp)
|
|
index = 0
|
|
min = 0
|
|
max = len(lines)
|
|
while index < max:
|
|
line = lines[index]
|
|
match = self._matches(forp, line, index, lines, args)
|
|
if inverse: match = not match
|
|
if match:
|
|
if boundaries:
|
|
min = index
|
|
break
|
|
else:
|
|
index += 1
|
|
else:
|
|
del lines[index]
|
|
max -= 1
|
|
if boundaries:
|
|
index = len(lines) - 1
|
|
while index > min:
|
|
line = lines[index]
|
|
match = self._matches(forp, line, index, lines, args)
|
|
if inverse: match = not match
|
|
if match:
|
|
break
|
|
else:
|
|
del lines[index]
|
|
index -= 1
|
|
return lines
|
|
|
|
def join(self, lastnl=False):
|
|
"""Obtenir les lignes jointes avec self.nl
|
|
|
|
Si lastnl==True, la dernière ligne se termine par nl. Cela est
|
|
approprié quand on veut par exemple générer un fichier.
|
|
"""
|
|
if lastnl:
|
|
return ''.join(map(self.__add_nl, self))
|
|
else:
|
|
nl = self._nl
|
|
if nl is None: nl = os.linesep
|
|
return nl.join(self)
|
|
|
|
class Lines(BLines):
|
|
"""Une liste de chaines de type unicode.
|
|
"""
|
|
__use_defaultIO = []
|
|
__dont_convert = []
|
|
def detect_encoding(lines):
|
|
encoding = detect_line_encoding(lines)
|
|
if encoding is None:
|
|
encoding = guess_line_encoding(lines,
|
|
Lines.__use_defaultIO,
|
|
Lines.__dont_convert)
|
|
return encoding
|
|
detect_encoding = staticmethod(detect_encoding)
|
|
|
|
_uio = None
|
|
def get_uio(self): return self._uio
|
|
def set_uio(self, uio): self._uio = uio
|
|
uio = property(get_uio, set_uio)
|
|
|
|
_encoding_detector = detect_encoding
|
|
def get_encoding_detector(self): return self._encoding_detector
|
|
def set_encoding_detector(self, encoding_detector): self._encoding_detector = encoding_detector
|
|
encoding_detector = property(get_encoding_detector, set_encoding_detector)
|
|
|
|
def __init__(self, lines=None, nl=None, uio=None, encoding_detector=None):
|
|
if uio is not None: self._uio = uio
|
|
if encoding_detector is not None:
|
|
self._encoding_detector = encoding_detector
|
|
BLines.__init__(self, lines, nl)
|
|
|
|
def _after_readlines(self, lines, uio):
|
|
set_uio = uio is None
|
|
if uio is None: uio = self._uio
|
|
if uio is None:
|
|
encoding_detector = self._encoding_detector
|
|
if encoding_detector is not None:
|
|
encoding = encoding_detector(lines)
|
|
if encoding is self.__use_defaultIO:
|
|
uio = defaultIO
|
|
if set_uio: self._uio = uio
|
|
elif encoding is self.__dont_convert:
|
|
pass
|
|
else:
|
|
uio = UnicodeIO(encoding)
|
|
if set_uio: self._uio = uio
|
|
if uio is not None:
|
|
lines = map(uio.u, lines)
|
|
return lines
|
|
|
|
def _before_writelines(self, lines, uio):
|
|
if uio is None: uio = self._uio
|
|
if uio is None: uio = defaultIO
|
|
return map(uio.s, lines)
|