#!/usr/bin/env python # -*- coding: utf-8 -*- u"""%(scriptname)s: gérer une liste de tâches USAGE %(scriptname)s [options] list|add|edit|do|commit|remove|purge|report|update [args] OPTIONS -c tasksrc Spécifier le fichier de configuration Utilisez l'option --help avec chacune des commandes pour avoir une description plus complète de leur utilisation""" __all__ = ('Task', 'TextFile', 'TWFile', 'TasksrcFile', 'TasksCLI') import sys, os, re from os import path from getopt import getopt from types import StringType, UnicodeType, ListType, TupleType from pyutools.dates import datef from pyutools.config import ShConfigFile from pyutools import scriptdir, scriptname, dict, ensure_unicode from pyutools import uprint, get_color, eerror, ewarn, enote from pyutools import edit_template from TiddlyWiki import Tiddler, TiddlyWiki _marker = [] DEFAULT_ENCODING = 'iso-8859-1' # propriétés standard d'une tâche ID = 'id' TITLE = 'title' DESC = 'desc' TEXT = 'text' ENCODING = 'encoding' DATE = 'date' # date de création TYPE = 'type' # type de tâche. par défaut, vaut 'TODO' # propriétés pour une tâche de type TODO TODO_TYPE = 'TODO' DONE = 'done' # date de fin de tâche RELEASED_SUFFIX = '_released' # date de release du projet PURGED_SUFFIX = '_purged' # date de purge du fichier pour le projet # propriétés pour une tâche de type IDEA IDEA_TYPE = 'IDEA' class Task: # ligne telle qu'elle est écrite dans le fichier line = None # le texte, les propriétés, les projets, les contextes text = None props = None projects = None contexts = None # cet objet a-t-il été modifié? modified = False # numero d'index de cet tâche dans la liste des tâches index = None encoding = DEFAULT_ENCODING def ensure_unicode(self, value): return ensure_unicode(value, self.encoding) def quote_nl(self, text): if text is not None: text = text.replace('\\', r'\\') text = text.replace('\n', r'\n') return text def unquote_nl(self, text): if text is not None: text = text.replace(r'\n', '\n') text = text.replace(r'\\', '\\') return text ################################################################## def __init__(self, line=None, **kw): self.text = u'' self.props = {} self.projects = [] self.contexts = [] if line is not None: self.fromline(line) for name, value in kw.items(): self[name] = value TAGS_PATTERN = re.compile(r'\s*\{\s*((?:(?:\+\S+\s*)?(?:@\S+\s*)?(?:\S+=\S+\s*)?)*)\}\s*$') PROP_PATTERN = re.compile(r'(\S+)=(\S+)') PROJ_PATTERN = re.compile(r'\+(\S+)\s*') CTXT_PATTERN = re.compile(r'@(\S+)\s*') def fromline(self, line): """Initialiser cet objet à partir d'une ligne d'un fichier. """ mo = self.TAGS_PATTERN.search(line) if mo is not None: tags = mo.group(1) props = self.PROP_PATTERN.findall(tags) for name, value in props: if name == ENCODING: self[name] = value for name, value in props: if name != ENCODING: self[name] = value self.projects = [] for project in self.PROJ_PATTERN.findall(tags): self.projects.append(self.ensure_unicode(project)) self.contexts = [] for context in self.CTXT_PATTERN.findall(tags): self.contexts.append(self.ensure_unicode(context)) self.text = self.ensure_unicode(self.unquote_nl(line[:mo.start(0)])) else: self.text = self.ensure_unicode(self.unquote_nl(line)) def toline(self): """Générer une ligne d'un fichier à partir de cet objet. """ if self.line is None: line = self.quote_nl(self.text or u'') if self.props: prefix = u' {' for project in self.projects: line += u'%s+%s' % (prefix, project) prefix = u' ' for context in self.contexts: line += u'%s@%s' % (prefix, context) prefix = u' ' for name, value in self.props.items(): line += u'%s%s=%s' % (prefix, name, value) prefix = u' ' line += u'}' self.line = line.encode(self.encoding) return self.line __repr__ = toline __str__ = toline def update_from(self, t): for n in ('line', 'text', 'props', 'projects', 'contexts'): tp = getattr(self, n) op = getattr(t, n) if tp != op: setattr(self, n, op) self.modified = True # Gestion des propriétés def __eq__(self, other): return self is other def __len__(self): return len(self.props) def __getattr__(self, name): if name in self: return self[name] else: raise NameError, name def __getitem__(self, name, default=_marker): if name in self.METH_PROPERTIES: return getattr(self, 'get_' + name)() else: if default is _marker: return self.props[name] else: return self.props.get(name, default) get = __getitem__ def __delitem__(self, name): if name in self.METH_PROPERTIES: getattr(self, 'del_' + name)() else: del self.props[name] self.line = None self.modified = True def __setitem__(self, name, value): if name in self.METH_PROPERTIES: getattr(self, 'set_' + name)(value) else: self.props[name] = self.ensure_unicode(value) self.line = None self.modified = True def is_modified(self): return self.modified def __contains__(self, key): return key in self.METH_PROPERTIES or key in self.props def has_key(self, key): return key in self.METH_PROPERTIES or self.props.has_key(key) def keys(self): return self.props.keys() def values(self): return self.props.values() def items(self): return self.props.items() def clear(self): self.props.clear() def update(self, props=None, **kw): if props is not None: for key, value in props.items(): self[key] = value for key, value in kw.items(): self[key] = value # Propriétés gérées par des méthodes METH_PROPERTIES = [TITLE, DESC, TEXT, ENCODING] def get_title(self): pos = self.text.find('\n\n') if pos != -1: return self.text[:pos] else: return self.text def del_title(self): self.set_title('') def set_title(self, title): title = self.ensure_unicode(title) desc = self.get_desc() self.text = title + (desc and u'\n\n' + desc or '') self.line = None self.modified = True def get_desc(self): text = self.text or u'' pos = text.find('\n\n') if pos != -1: return text[pos + 2:] else: return None def del_desc(self): self.set_desc('') def set_desc(self, desc): title = self.get_title() desc = self.ensure_unicode(desc) self.text = title + (desc and u'\n\n' + desc or '') self.line = None self.modified = True def get_text(self): return self.text def del_text(self): self.set_text('') def set_text(self, text): self.text = self.ensure_unicode(text) self.line = None self.modified = True def get_encoding(self): return self.encoding def del_encoding(self): self.encoding = DEFAULT_ENCODING del self.props[ENCODING] self.modified = True def set_encoding(self, encoding): self.encoding = encoding self.props[ENCODING] = encoding self.modified = True # Opérations de haut niveau def has_id(self): return self.has_key(ID) def get_id(self): return self.get(ID, None) def set_id(self, id=None): if id is None: if self.has_key(ID): del self[ID] else: self[ID] = id def has_date(self): return self.has_key(DATE) def get_date(self): return self.get(DATE, None) def set_date(self, date=None): if date is None: date = datef() self[DATE] = date def get_type(self): return self.get(TYPE, TODO_TYPE) def set_type(self, type=None): if type is not None: type = type.upper() if type in ('T', 'TODO'): type = TODO_TYPE elif type in ('I', 'IDEA'): type = IDEA_TYPE if type is None or type == TODO_TYPE: if self.has_key(TYPE): del self[TYPE] else: self[TYPE] = type def is_todo_type(self): return self.get_type() == TODO_TYPE def is_done(self): return self.has_key(DONE) def set_done(self, date=None): if date is None: date = datef() self[DONE] = date def is_idea_type(self): return self.get_type() == IDEA_TYPE def has_projects(self): return len(self.projects) != 0 def get_projects(self): return self.projects def set_projects(self, projects): self.projects = list(projects) self.modified = True def has_project(self, project): return project in self.projects def add_project(self, project): self.projects.append(project) def remove_project(self, project): self.projects.remove(project) def is_project_purged(self, project): return self.has_key(project + PURGED_SUFFIX) def set_project_purged(self, project, date=None): if date is None: date = datef() self[project + PURGED_SUFFIX] = date def is_all_purged(self, is_project_func=None): for project in self.projects: if is_project_func is not None: if not is_project_func(project): continue if not self.is_project_purged(project): return False return True def is_project_released(self, project): return self.has_key(project + RELEASED_SUFFIX) def set_project_released(self, project, date=None): if date is None: date = datef() self[project + RELEASED_SUFFIX] = date def is_all_released(self): for project in self.projects: if not self.is_project_released(project): return False return True def has_contexts(self): return len(self.contexts) != 0 def get_contexts(self): return self.contexts def set_contexts(self, contexts): self.contexts = list(contexts) self.modified = True def add_context(self, context): self.contexts.append(context) def remove_context(self, context): self.contexts.remove(context) class TasksFile: """Classe de base des fichiers contenant des tâches. """ valid = False tasks = None props = None def update_indexes(self): index = 1 for task in self.tasks: task.index = index index = index + 1 def update_ids(self): known_ids = {} for task in self.tasks: if task.has_id(): known_ids[task.get_id()] = None lastid = self.get_lastid() for task in self.tasks: if not task.has_key(ID): id = lastid + 1 if known_ids.has_key(id): while known_ids.has_key(id): id = id + 1 task[ID] = id lastid = id self.set_lastid(lastid) def load(self, file, raise_exception=True): pass def save(self, file=None, raise_exception=True): pass def is_valid(self): return self.valid def get_file(self): pass def get_tasks(self): return self.tasks def __len__(self): return len(self.tasks) def __getitem__(self, index): return self.tasks[index] def __delitem__(self, index): del self.tasks[index] self.update_indexes() def __setitem__(self, index, task): self.tasks[index] = task self.update_ids() def add(self, task): self.tasks.append(task) self.update_indexes() self.update_ids() def remove(self, task_or_id): if isinstance(task_or_id, Task): task = task_or_id else: task = self.get_by_id(task_or_id) self.tasks.remove(task) self.update_indexes() self.update_ids() def __filter_indexes(self, tasks, indexes): if not indexes: return tasks filtered = [] maxindex = max(map(lambda t: t.index, tasks)) for i in indexes: if i > 0 and i <= maxindex: filtered.append(tasks[i - 1]) return filtered def __filter_contexts(self, tasks, contexts): if not contexts: return tasks if '*' in contexts: return tasks filtered = [] for t in tasks: for c in contexts: if not t.has_context(c): break else: filtered.append(t) return filtered def __filter_projects(self, tasks, projects): if not projects: return tasks if '*' in projects: return tasks filtered = [] for t in tasks: for p in projects: if not t.has_project(p): break else: filtered.append(t) return filtered def __filter_patterns(self, tasks, patterns): if not patterns: return tasks filtered = [] for t in tasks: for p in patterns: if not re.search(p, t.get_text()): break else: filtered.append(t) return filtered def __filter_props(self, tasks, props): if not props: return tasks filtered = [] for t in tasks: for name, value in props.items(): if value == '*': if not t.has_key(name): break elif value == '': if t.has_key(name): break else: if not t.has_key(name) or t[name] != value: break else: filtered.append(t) return filtered def filter(self, indexes=None, projects=None, contexts=None, patterns=None, props=None, **kw): filtered = self.__filter_indexes(self.get_tasks(), indexes) filtered = self.__filter_contexts(filtered, contexts) filtered = self.__filter_projects(filtered, projects) filtered = self.__filter_patterns(filtered, patterns) if props is None: props = kw else: props.update(kw) filtered = self.__filter_props(filtered, props) return filtered def get_by_id(self, id): filtered = self.filter(props={ID: id}) if len(filtered) > 0: return filtered[0] else: return None def has_props(self): return len(self.props) != 0 def has_prop(self, name): return self.props.has_key(name) def get_prop(self, name, default=_marker): value = self.props.get(name, default) if value is _marker: value = None return value def set_prop(self, name, value): self.props[name] = value # Propriétés standard d'un fichier texte TF_LASTID = 'lastid' TF_ENCODING = 'encoding' # encoding par défaut du fichier class TextFile(TasksFile): """Fichier de tâches au format texte. """ TASKS_TXT = ['TASKS.txt', 'tasks.txt'] PURGED_TXT = ['PURGED.txt', 'purged.txt'] FILENAMES = [TASKS_TXT, PURGED_TXT] TASKS_FILETYPE = 0 PURGED_FILETYPE = 1 valid = False file = pf = dirname = filename = None tasks = None props = None def __init__(self, src=None, filetype=TASKS_FILETYPE, raise_exception=False): self.tasks = [] self.props = {} filenames = self.FILENAMES[filetype] if src is None: # Mettre en place les éléments pour qu'une sauvegarde écrive dans le fichier # du type spécifié. file = filenames[0] self.__update_file(file) self.valid = True else: # Sinon on essaye de charger le fichier if path.isdir(src): dir = src file = path.join(dir, filenames[0]) if not path.exists(file): file2 = path.join(dir, filenames[1]) if path.exists(file2): file = file2 else: file = src self.load(file, raise_exception=raise_exception) def __update_file(self, file): self.file = file self.pf = path.abspath(self.file) self.dirname, self.filename = path.split(self.pf) def __strip_nl(self, line): if line.endswith('\r\n'): return line[:-2] elif line.endswith('\n'): return line[:-1] elif line.endswith('\r'): return line[:-1] else: return line def __add_nl(self, line): return line + '\n' PROP_PATTERN = re.compile(r'(\S+)=(\S+)') def __load_props(self, line): for name, value in self.PROP_PATTERN.findall(line): self.set_prop(name, value) def load(self, file, raise_exception=True): self.__update_file(file) try: self.tasks = [] self.valid = False inf = open(self.pf, 'rb') try: lines = map(self.__strip_nl, inf.readlines()) self.valid = True finally: inf.close() except IOError: if raise_exception: raise return False if lines[0:1] and lines[0].startswith("##"): self.__load_props(lines[0]) lines = lines[1:] for line in lines: t = Task(line) t.modified = False self.tasks.append(t) self.update_indexes() self.update_ids() return True def save(self, file=None, raise_exception=True): if file is not None: self.__update_file(file) self.valid = False try: # écrire dans un ficher temporaire d'abord tmpf = self.pf + '.tmp' outf = open(tmpf, 'wb') try: lines = [] if self.has_props(): line = "##" for name, value in self.props.items(): line += " %s=%s" % (name, value) lines.append(line) lines.extend(map(lambda t: t.toline(), self.tasks)) outf.writelines(map(self.__add_nl, lines)) finally: outf.close() try: os.rename(tmpf, self.pf) except OSError: os.remove(self.pf) os.rename(tmpf, self.pf) self.valid = True return True except IOError: if raise_exception: raise return False def get_file(self): return self.pf def get_lastid(self): return int(self.get_prop(TF_LASTID, 0)) def set_lastid(self, lastid): self.set_prop(TF_LASTID, int(lastid)) class TWFile(TasksFile): """Fichier de tâches au format TiddlyWiki """ valid = False file = pf = dirname = filename = None twfile = None tasks = None props = None def __init__(self, src=None, raise_exception=False): self.tasks = [] self.props = {} if src is None: twfile = TiddlyWiki(src, raise_exception=False) if twfile.is_valid(): self.load(twfile=twfile) elif raise_exception: raise IOError("Impossible de charger le TiddlyWiki associé") else: self.load(src, raise_exception=raise_exception) def __update_file(self, file): self.file = file self.pf = path.abspath(self.file) self.dirname, self.filename = path.split(self.pf) def load(self, file, raise_exception=True): self.valid = False # XXX continuer ici twfile = TiddlyWiki(src, raise_exception=False) def save(self, file=None, raise_exception=True): pass class ProxyTask: __task = None index = None def __init__(self, task, index): self.__task = task self.index = index def __getattr__(self, name, default=_marker): if default is _marker: return getattr(self.__task, name) else: return getattr(self.__task, name, default) class FileAggregate(TasksFile): __tfs = None def __init__(self, tfs): self.__tfs = [] for tf in tfs: self.__tfs.append(tf) def load(self, file, raise_exception=True): raise NotImplementedError def save(self, file=None, raise_exception=True): for tf in self.__tfs: tf.save(file, raise_exception) def __len__(self): return reduce(lambda n, m: n + m, map(lambda t: len(t), self.__tfs)) def __get_tfindex(self, index): tfi = index for tf in self.__tfs: if tfi < len(tf): return tf, tfi tfi -= len(tf) raise IndexError, index def __getitem__(self, index): tf, tfi = self.__get_tfindex(index) return ProxyTask(tf[tfi], index) def __delitem__(self, index): tf, tfi = self.__get_tfindex(index) del tf[tfi] def __setitem__(self, index, task): tf, tfi = self.__get_tfindex(index) tf[tfi] = task def add(self, task): raise NotImplementedError def remove(self, task): raise NotImplementedError def filter(self, indexes=None, projects=None, contexts=None, props=None, **kw): offset = 0 filtered = [] for tf in self.__tfs: for t in tf.filter(indexes, projects, contexts, props, **kw): filtered.append(ProxyTask(t, t.index + offset)) offset += len(tf) return filtered def has_props(self): for tf in self.__tfs: if tf.has_props(): return True return False def has_prop(self, name): for tf in self.__tfs: if tf.has_prop(name): return True return False def get_prop(self, name, default=_marker): for tf in self.__tfs: if tf.has_prop(name): return tf.get_prop(name) if default is _marker: return None else: return default def set_prop(self, name, value): raise NotImplementedError def get_lastid(self): raise NotImplementedError def set_lastid(self, lastid): raise NotImplementedError def is_valid(self): raise NotImplementedError def get_file(self): raise NotImplementedError ###################################################################### class TasksrcFile(ShConfigFile): TASKSRC = "tasksrc" def __init__(self, file=None, raise_exception=True): if file is None: testdir = path.join(scriptdir, "test") utoolsrcdir = path.join(path.expanduser("~"), ".utools") if path.isdir(testdir): file = path.join(testdir, self.TASKSRC) else: file = path.join(utoolsrcdir, self.TASKSRC) raise_exception = False ShConfigFile.__init__(self, file=file, raise_exception=raise_exception) TASKSDIR = "tasksdir" PROJDIRS = "projdirs" TASKSFILE = "tasksfile" PURGEDFILE = "purgedfile" DONE_COLOR = "done_color" IDEA_COLOR = "idea_color" PROJECT_COLOR = "project_color" CONTEXT_COLOR = "context_color" TAG_COLOR = "tag_color" def __p(self, p, refdir=None): if refdir is not None: if not path.isdir(refdir): refdir = path.split(refdir)[0] p = path.normpath(path.expanduser(p)) if refdir is not None: p = path.join(refdir, p) return p def get_tasksdir(self): """Retourner le répertoire de base pour les fichiers de tâches """ if self.has_key(self.TASKSDIR): return self.__p(self[self.TASKSDIR]) else: return self.dirname def set_tasksdir(self, tasksdir=None): if tasksdir is None: if self.has_key(self.TASKSDIR): del self[self.TASKSDIR] else: self[self.TASKSDIR] = tasksdir def get_projdirs(self): """Retourner la liste des répertoires de base de projets """ if self.has_key(self.PROJDIRS): return map(lambda p: self.__p(p, self.get_tasksdir()), self.get_paths(self.PROJDIRS)) else: return () def set_projdirs(self, projdirs=None): if projdirs is None: if self.has_key(self.PROJDIRS): del self[self.PROJDIRS] else: if type(projdirs) is not StringType: projdirs = path.pathsep.join(projdirs) self[self.PROJDIRS] = projdirs def get_tasksfile(self, default=_marker): """Retourner tasksfile s'il est défini, None sinon """ if self.has_key(self.TASKSFILE): return self.__p(self[self.TASKSFILE], self.get_tasksdir()) elif default is _marker: return None else: return default def get_taskssrc(self): """Retourner tasksfile s'il est défini, tasksdir sinon """ return self.get_tasksfile(self.get_tasksdir()) def set_tasksfile(self, tasksfile=None): if tasksfile is None: if self.has_key(self.TASKSFILE): del self[self.TASKSFILE] else: self[self.TASKSFILE] = tasksfile def get_purgedfile(self, default=_marker): """Retourner purgedfile s'il est défini, None sinon """ if self.has_key(self.PURGEDFILE): return self.__p(self[self.PURGEDFILE], self.get_tasksdir()) elif default is _marker: return None else: return default def get_purgedsrc(self): """Retourner purgedfile s'il est défini, tasksdir sinon """ return self.get_purgedfile(self.get_tasksdir()) def set_purgedfile(self, purgedfile=None): if purgedfile is None: if self.has_key(self.PURGEDFILE): del self[self.PURGEDFILE] else: self[self.PURGEDFILE] = purgedfile def get_done_color(self): return self.get(self.DONE_COLOR, "faint") def set_done_color(self, done_color): self[self.DONE_COLOR] = done_color def get_idea_color(self): return self.get(self.IDEA_COLOR, "yellow") def set_idea_color(self, idea_color): self[self.IDEA_COLOR] = idea_color def get_project_color(self): return self.get(self.PROJECT_COLOR, "blue") def set_project_color(self, project_color): self[self.PROJECT_COLOR] = project_color def get_context_color(self): return self.get(self.CONTEXT_COLOR, "blue") def set_context_color(self, context_color): self[self.CONTEXT_COLOR] = context_color def get_tag_color(self): return self.get(self.TAG_COLOR, "faint,blue") def set_tag_color(self, tag_color): self[self.TAG_COLOR] = tag_color class TasksCLI: tasksrc = None tasksfile = None def __tf(self): # retourner le fichier pour les tâches if self.tasksfile is None: tasksfile = self.tasksrc.get_tasksfile() if tasksfile is not None: tasksfile = TextFile(tasksfile) else: tasksfile = TextFile(self.tasksrc.get_taskssrc(), TextFile.TASKS_FILETYPE) self.tasksfile = tasksfile return self.tasksfile purgedfile = None def __pf(self): # retourner le fichier pour les tâches purgées if self.purgedfile is None: purgedfile = self.tasksrc.get_purgedfile() if purgedfile is not None: purgedfile = TextFile(purgedfile) else: purgedfile = TextFile(self.tasksrc.get_purgedsrc(), TextFile.PURGED_FILETYPE) self.purgedfile = purgedfile return self.purgedfile def __init__(self, tasksrc=None): if not isinstance(tasksrc, TasksrcFile): tasksrc = TasksrcFile(tasksrc) self.tasksrc = tasksrc CONFOPT = 'c:' CONFLOPT = ['config=', 'tasksdir=', 'projdirs=', 'tasksfile=', 'purgedfile='] def is_global_option(self, opt, value): if opt in ('-c', '--config'): self.tasksrc = TasksrcFile(value) self.tasksfile = None self.purgedfile = None elif opt in ('--tasksdir', ): self.tasksrc.set_tasksdir(value) elif opt in ('--projdirs', ): self.tasksrc.set_projdirs(value) elif opt in ('--tasksfile', ): self.tasksfile = None self.tasksrc.set_tasksfile(value) elif opt in ('--purgedfile', ): self.purgedfile = None self.tasksrc.set_purgedfile(value) else: return False return True def get_projdir(self, dir): """Si dir est un répertoire absolu, retourner le nom de projet correspondant, ou None si ce n'est pas un répertoire de projet. Sinon, retourner le répertoire absolu correspondant au répertoire de projet """ projdirs = self.tasksrc.get_projdirs() if path.isabs(dir): # dir est un répertoire absolu. retourner le nom de projet # correspondant maxlen = 0 project = None if dir not in projdirs: for projdir in projdirs: if dir.startswith(projdir + '/'): p = dir[len(projdir) + 1:] i = p.find('/') if i != -1: p = p[:i] pd = path.join(projdir, p) if len(pd) > maxlen: maxlen = len(pd) project = p return project else: # dir est un nom de projet. retourner le répertoire de projet # correspondant p = dir for projdir in projdirs: pd = path.join(projdir, p) if path.isdir(pd): return pd return None def get_project(self): """Retourner le nom du projet si on est dans un répertoire de projet, None sinon """ return self.get_projdir(os.getcwd()) def is_project(self, project): """Retourner True si le project existe physiquement (le répertoire correspondant est trouvé sur le disque.) """ return self.get_projdir(project) is not None def get_tf(self, project): """Obtenir le fichier de tâche de project """ projdir = self.get_projdir(project) if projdir is None: return None elif path.exists(path.join(projdir, 'TODO.html')): return TWFile(projdir) else: return TextFile(projdir, TextFile.TASKS_FILETYPE) def update_projects(self, delete=None, add_or_update=None, delete_projects=None, delete_ids=None): """Mettre à jour les tâches dans les répertoires de projet delete est une liste de tâches qui doivent être supprimées dans les répertoires de projet. add_or_update est une liste de tâches qui doivent être ajoutées ou mises à jour dans les répertoires de projet. delete_projects est la liste des projets qui ont été supprimés de ces tâches. delete_ids est une liste d'ids pour lesquels les tâches associées doivent être supprimées avant d'être mises à jour. On fait dans l'ordre: delete, delete_ids, add_or_update, delete_projects """ tfs = {} def get_tf(p): tf = tfs.get(p, None) if tf is None: tf = self.get_tf(p) if tf is not None: tfs[p] = tf return tf def remove_from_tf(p, t): tf = get_tf(p) if tf is not None: tf.remove(t.get_id()) def update_in_tf(p, t): tf = get_tf(p) if tf is not None: pt = tf.get_by_id(t.get_id()) if pt is not None: pt.update_from(t) else: tf.add(t) def norm_seq(o): if o is not None: if type(o) not in (ListType, TupleType): o = (o, ) return o delete = norm_seq(delete) add_or_update = norm_seq(add_or_update) delete_projects = norm_seq(delete_projects) delete_ids = norm_seq(delete_ids) if delete: for t in delete: for p in t.get_projects(): remove_from_tf(p, t) if add_or_update: if delete_ids: for t in add_or_update: for p in t.get_projects(): for id in delete_ids: get_tf(p).remove(id) for t in add_or_update: for p in t.get_projects(): update_in_tf(p, t) if delete_projects: for p in delete_projects: for t in add_or_update: remove_from_tf(p, t) for tf in tfs.values(): tf.save() INDEX_PATTERN = re.compile(r'(\d+)$') def __parse_index(self, index): mo = self.INDEX_PATTERN.match(index) if mo is not None: return int(mo.group(1)) else: return None PROP_PATTERN = re.compile(r'(\S+)=(\S*)$') def __parse_prop(self, prop): mo = self.PROP_PATTERN.match(prop) if mo is not None: return mo.group(1), mo.group(2) else: return None, None CONTEXT_PATTERN = re.compile(r'@(\S+)$') def __parse_context(self, context): mo = self.CONTEXT_PATTERN.match(context) if mo is not None: return mo.group(1) else: return None PROJECT_PATTERN = re.compile(r'\+(\S+)$') def __parse_project(self, project): mo = self.PROJECT_PATTERN.match(project) if mo is not None: return mo.group(1) else: return None def __get_filtered(self, tf, argv, auto_filter=True, indexes=None, patterns=None, contexts=None, projects=None, props=None, **kw): if indexes is None: indexes = [] if props is None: props = kw else: props.update(kw) if contexts is None: contexts = [] if projects is None: projects = [] if patterns is None: patterns = [] auto_filtered = None if not argv and auto_filter: project = self.get_project() if project is not None: projects.append(project) auto_filtered = project else: i = 0 for i in range(len(argv)): arg = argv[i] if arg == '--': i += 1 break index = self.__parse_index(arg) name, value = self.__parse_prop(arg) context = self.__parse_context(arg) project = self.__parse_project(arg) if index is not None: indexes.append(index) elif name is not None: props[name] = value elif context is not None: contexts.append(context) elif project is not None: projects.append(project) else: patterns.append(arg) argv = argv[i:] return tf.filter(indexes, projects, contexts, patterns, props), auto_filtered def __cvs_commit(self, t, argv=()): pass ############################################################################ def ADD(self, type_option=None, done_option=False, edit_option=False, commit_option=False, text=None, contexts=None, projects=None, props=None, argv=(), scriptname=None, **kw): u"""%(scriptname)s: Créer une nouvelle tâche USAGE %(scriptname)s [-d] [-e] titre de la tâche OPTIONS -t Type de la tâche: TODO (par défaut) ou IDEA -d Marquer la tâche comme faite après l'avoir créée. Ceci permet de suivre une activité pour laquelle l'on n'avait pas créée un tâche au préalable. Cette option est activée par défaut avec tt. -e Editer la tâche après l'avoir créée. Ceci permet d'ajouter des informations supplémentaires. -C Faire 'cvs commit' avec le message de la tâche après avoir marqué la tâche comme faite. Il n'est pas possible de passer des arguments à cvs avec cette option. Cette option implique l'option -d""" opts, argv = getopt(argv, self.CONFOPT + 'ht:deC', self.CONFLOPT + ['help', 'type=', 'done', 'edit', 'commit']) for opt, value in opts: if self.is_global_option(opt, value): pass elif opt in ('-h', '--help'): uprint(self.ADD.__doc__ % locals()) sys.exit(0) elif opt in ('-t', '--type'): type_option = value elif opt in ('-d', '--done'): done_option = True elif opt in ('-e', '--edit'): edit_option = True elif opt in ('-C', '--commit'): done_option = True commit_option = True if text is None: text = "" if contexts is None: contexts = [] if projects is None: projects = [] if props is None: props = kw else: props.update(kw) if argv: for arg in argv: name, value = self.__parse_prop(arg) context = self.__parse_context(arg) project = self.__parse_project(arg) if name is not None: props[name] = value elif context is not None: contexts.append(context) elif project is not None: projects.append(project) else: if text != "": text += " " text += arg else: # Si l'on n'a pas donné de texte, donner la possibilité de l'éditer. edit_option = True tf = self.__tf() t = None if props.has_key(ID): tasks = tf.filter(id=props[ID]) if tasks: t = tasks[0] newtask = False if t is None: t = Task() newtask = True t.set_type(type_option) t.update(props) t.set_contexts(contexts) t.set_projects(projects) t.set_text(text) if not t.has_date(): t.set_date() if done_option: t.set_done() if not t.has_projects(): project = self.get_project() if project is not None: t.add_project(project) if newtask: tf.add(t) tf.save() self.update_projects(add_or_update=t) if argv: self.__print_task(t) if edit_option: self.EDIT(t) if commit_option: self.__cvs_commit(t) ############################################################################ def __print_task(self, t, width=1, showdesc=False): parts = [] prefix_color = '' reset_color = get_color("reset") if t.is_todo_type(): if t.is_done(): prefix_color = get_color(self.tasksrc.get_done_color()) parts.append(prefix_color) parts.append(u"[x]") else: parts.append(u"[ ]") elif t.is_idea_type(): prefix_color = get_color(self.tasksrc.get_idea_color()) parts.append(prefix_color) parts.append(u"Id>") else: prefix = parts.append(u"??>") parts.append(u" %*i " % (width, t.index)) for p in t.get_projects(): parts.append(get_color(self.tasksrc.get_project_color())) if t.is_project_purged(p): parts.append("-%s" % p) else: parts.append("+%s" % p) parts.append(reset_color) parts.append(prefix_color) parts.append(u" ") for c in t.get_contexts(): parts.append(get_color(self.tasksrc.get_context_color())) parts.append("@%s" % c) parts.append(reset_color) parts.append(prefix_color) parts.append(u" ") parts.append(t.get_title().replace('\n', ' ')) parts.append(reset_color) uprint(''.join(parts)) desc = t.get_desc() if showdesc: if desc: uprint(desc) tag_color = get_color(self.tasksrc.get_tag_color()) for name, value in t.items(): uprint(u"%s%s=%s%s" % (tag_color, name, value, reset_color)) def __print_stats(self, tasks, auto_filtered=None, show_only_todos=True): numtodos = len(filter(lambda t: not t.is_done(), tasks)) numtasks = len(filter(lambda t: not t.is_todo_type(), tasks)) infos = [] if show_only_todos and numtasks != 0: infos.append(u"%i tâche(s) masquée(s)" % numtasks) if auto_filtered: infos.append(u"filtrage auto +%s" % auto_filtered) if infos: infos = u" [%s]" % u", ".join(infos) else: infos = u"" enote(u"%i / %i tâche(s) trouvée(s)%s" % (numtodos, len(tasks), infos)) def LIST(self, listall_option=False, listpurged_option=False, showdesc_option=False, showstats_option=True, indexes=None, patterns=None, contexts=None, projects=None, props=None, argv=(), scriptname=None, **kw): u"""%(scriptname)s: Lister les tâches en cours et leur état USAGE %(scriptname)s [-a] [-l] [--no-stats] filtres*|numeros* OPTIONS -a Lister aussi les tâches qui ne sont pas de type TODO -p Lister aussi les tâche purgées -l Afficher aussi la description et les tags associés aux tâches --no-stats Ne pas afficher de statistiques à la fin de la liste des tâches Cette commande prend éventuellement une liste de filtres qui sont combinées avec AND (les tâches listées doivent valider tous les filtres), sauf si on ne donne que des numéros de tâches, auquel cas on liste simplement toutes les tâches dont on a donné le numéro. Les filtre peuvent être des formes suivantes: +project Pour filtrer uniquement les tâches d'un projet. Si on se trouve dans un répertoire de projet, on filtre automatiquement sur ce projet. Si l'on veut forcer le listage des tâches de tous les projets, il faut utiliser la syntaxe +* @context Pour filtrer uniquement les tâches d'un contexte. Si l'on veut forcer le listage des tâches de tous les contextes, il faut utiliser la syntaxe @* tag=value Pour filtrer uniquement les tâches dont le tag existe et a la valeur indiquée. La syntaxe tag=* permet de filtrer simplement sur l'existence du tag. La syntaxe tag= permet de filtrer sur la NON existence du tag. Les tags valides sont pour le moment id, date, done, ${project}_purged où $project est un nom de projet valide dans cette tâche. <toute autre valeur> Afficher les tâches qui contiennent les mots spécifiés.""" opts, argv = getopt(argv, self.CONFOPT + 'hapln', self.CONFLOPT + ['help', 'list-all', 'list-purged', 'show-desc', 'no-stats']) for opt, value in opts: if self.is_global_option(opt, value): pass elif opt in ('-h', '--help'): uprint(self.LIST.__doc__ % locals()) sys.exit(0) elif opt in ('-a', '--list-all'): listall_option = True elif opt in ('-p', '--list-purged'): listpurged_option = True elif opt in ('-l', '--show-desc'): showdesc_option = True elif opt in ('-n', '--no-stats'): showstats_options = False if listpurged_option: tf = FileAggregate([self.__pf(), self.__tf()]) else: tf = self.__tf() tasks, auto_filtered = self.__get_filtered(tf, argv, indexes=indexes, patterns=patterns, contexts=contexts, projects=projects, props=props) if not auto_filtered: # Si on effectue une recherche, afficher TOUS les résultats listall_option = True if tasks: width = len(str(max(map(lambda t: t.index, tasks)))) else: width = 1 for t in tasks: if not listall_option and not t.is_todo_type(): continue self.__print_task(t, width, showdesc=showdesc_option) if showstats_option: self.__print_stats(tasks, auto_filtered, not listall_option) ############################################################################ EDIT_TEMPLATE = u""" EDIT: ---------------------------------------------------------------- EDIT: Saisissez ou modifiez le titre et la description de la tâche. EDIT: EDIT: - Les lignes commencant par 'EDIT:' seront supprimées automatiquement EDIT: - Les lignes projects: et contexts: peuvent être modifiées si EDIT: nécessaire. EDIT: EDIT: ----------------------------------------------------------------""" CONTEXTS_PATTERN = re.compile(r'##\s*contexts:\s*') CTXT_PATTERN = re.compile(r'(\S+)\s*') PROJECTS_PATTERN = re.compile(r'##\s*projects:\s*') PROJ_PATTERN = re.compile(r'(\S+)\s*') PROP_PATTERN = re.compile(r'##\s*(\S+)\s*=\s*(\S+)') def __nblines(self, s): lines = s.split("\n") nblines = len(lines) if not lines[-1]: nblines -= 1 return nblines def EDIT(self, t=None, argv=(), scriptname=None, **kw): u"""%(scriptname)s: éditer une tâche dont on donne le numéro USAGE %(scriptname)s numero""" opts, argv = getopt(argv, self.CONFOPT + 'h', self.CONFLOPT + ['help']) for opt, value in opts: if self.is_global_option(opt, value): pass elif opt in ('-h', '--help'): uprint(self.EDIT.__doc__ % locals()) sys.exit(0) tf = self.__tf() if t is None: count = 0 if argv: tasks, auto_filtered = self.__get_filtered(tf, argv) count = len(tasks) if count > 0: t = tasks[0] if count == 0: error("Il faut spécifier un numéro de tâche à éditer") return elif count > 1: ewarn("Seule la première tâche sera éditée") template = u"" template += u"## contexts: %s\n" % " ".join(t.get_contexts()) template += u"## projects: %s\n" % " ".join(t.get_projects()) for name, value in t.items(): template += u"## %s=%s\n" % (name, value) template += u"\n" title = t.get_title() template += u"%s\n" % title setline = self.__nblines(template) setcol = len(title) desc = t.get_desc() if desc: template += u"\n%s" % desc template += self.EDIT_TEMPLATE lines = edit_template(template, 'EDIT:', setline, setcol).split('\n') new_contexts = [] new_projects = [] new_props = {} parsing_tags = True skip_empty = True text = [] for line in lines: if skip_empty and not line: continue if parsing_tags: moc = self.CONTEXTS_PATTERN.match(line) mop = self.PROJECTS_PATTERN.match(line) mo = self.PROP_PATTERN.match(line) if moc is not None: new_contexts.extend(map(t.ensure_unicode, self.CTXT_PATTERN.findall(line[moc.end():]))) continue elif mop is not None: new_projects.extend(map(t.ensure_unicode, self.PROJ_PATTERN.findall(line[mop.end():]))) continue elif mo is not None: name, value = mo.group(1), mo.group(2) new_props[name] = t.ensure_unicode(value) continue else: parsing_tags = False skip_empty = False text.append(line) text = t.ensure_unicode("\n".join(text)) if t.get_contexts() != new_contexts: t.set_contexts(new_contexts) delete_projects = [] if t.get_projects() != new_projects: # Si la liste des projets est différente, il faudra supprimer les # tâches des projets qui ne sont plus dans la liste for p in t.get_projects(): if p not in new_projects: delete_projects.append(p) t.set_projects(new_projects) delete_ids = [] if t.props != new_props: if t.props[ID] != new_props.get(ID, None): delete_ids.append(t.props[ID]) t.clear() t.update(new_props) if t.get_text() != text: t.set_text(text) if t.is_modified(): if t.get_text(): self.__print_task(t) tf.update_ids() tf.save() self.update_projects(add_or_update=t, delete_projects=delete_projects, delete_ids=delete_ids) else: # pas de texte, il faut supprimer la tâche tf.remove(t) tf.save() self.update_projects(delete=t) enote("Tâche supprimée") else: enote("Aucune modification effectuée") ############################################################################ def DO(self, commit_option=False, force_option=False, indexes=None, patterns=None, contexts=None, projects=None, props=None, argv=(), scriptname=None, **kw): u"""%(scriptname)s: Marquer comme faites les tâches dont on donne le numéro USAGE %(scriptname)s numeros* OPTIONS -C Faire 'cvs commit' avec le message de la tâche après avoir marqué la tâche comme faite. Dans ce cas, un seul numéro de tâche est autorisé. Les autres arguments sont passés à cvs. -f Faire le commit même si la tâche est déjà marquée comme faite.""" opts, argv = getopt(argv, self.CONFOPT + 'hCf', self.CONFLOPT + ['help', 'commit', 'force']) for opt, value in opts: if self.is_global_option(opt, value): pass elif opt in ('-h', '--help'): uprint(self.DO.__doc__ % locals()) sys.exit(0) elif opt in ('-C', '--commit'): commit_option = True elif opt in ('-f', '--force'): force_option = True if not argv: eerror("Il faut spécifier un numéro de tâche à marquer comme faite") return tf = self.__tf() tasks, auto_filtered = self.__get_filtered(tf, argv, indexes=indexes, patterns=patterns, contexts=contexts, projects=projects, props=props) saved = False updated = [] for t in tasks: done = t.is_done() if not done: t.set_done() if not t.is_todo_type(): t.set_type(None) if t.is_modified(): saved = False updated.append(t) self.__print_task(t) if commit_option and (force_option or not done): tf.save() saved = True self.__cvs_commit(t, argv) if updated: if not saved: tf.save() self.update_projects(add_or_update=updated) ############################################################################ def REMOVE(self, indexes=None, patterns=None, contexts=None, projects=None, props=None, argv=(), scriptname=None, **kw): u"""%(scriptname)s: Supprimer les tâches dont on donne le numéro USAGE %(scriptname)s numeros*""" opts, argv = getopt(argv, self.CONFOPT + 'h', self.CONFLOPT + ['help']) for opt, value in opts: if self.is_global_option(opt, value): pass elif opt in ('-h', '--help'): uprint(self.REMOVE.__doc__ % locals()) sys.exit(0) if not argv: eerror("Il faut spécifier un numéro de tâche à supprimer") return tf = self.__tf() tasks, auto_filtered = self.__get_filtered(tf, argv, indexes=indexes, patterns=patterns, contexts=contexts, projects=projects, props=props) if len(tasks): deleted = [] for t in tasks: tf.remove(t) deleted.append(t) tf.save() self.update_projects(delete=deleted) enote("%i tâche(s) supprimée(s)" % len(tasks)) ############################################################################ def PURGE(self, argv=(), scriptname=None, **kw): u"""%(scriptname)s: Purger les tâches qui sont faites USAGE %(scriptname)s [projet] OPTIONS Si aucun projet n'est spécifié, les tâches qui sont marquées comme faites et ne sont associées à aucun projet valide sont déplacées dans le fichier d'archive. Si un projet est spécifié, alors seules les tâches associées à ce projet sont considérées. Si après avoir marqué le projet comme purgé, la tâche n'est plus associée à aucun projet valide, la tâche est purgée: elle est déplacée dans le fichier d'archive et le fichier DONE.txt du projet. Le fichier d'archive est actuellement défini à $(ppath "$purgedfile")""" opts, argv = getopt(argv, self.CONFOPT + 'h', self.CONFLOPT + ['help']) for opt, value in opts: if self.is_global_option(opt, value): pass elif opt in ('-h', '--help'): uprint(self.PURGE.__doc__ % locals()) sys.exit(0) projects = [] if not argv: project = self.get_project() if project is not None: projects.append(project) else: projects.extend(argv) tf = self.__tf() pf = self.__pf() now = datef() updated = [] purged = [] for t in list(tf): if not t.is_done(): continue # la tâche est elligible à la purge if projects: # ne purger que les projets demandés for project in projects: t.set_project_purged(project, now) else: # purger tous les projets for project in t.get_projects(): t.set_project_purged(project, now) # si tous les projets sont purgés, on peut déplacer la ligne dans pf if t.is_all_purged(self.is_project): pf.add(t) tf.remove(t) purged.append(t) else: updated.append(t) if purged: pf.save() tf.save() self.update_projects(add_or_update=updated, delete=purged) enote("%i tâche(s) purgée(s)" % len(purged)) self.LIST(showstats_option=False, props={'done': '*'}) ############################################################################ def REPORT(self, argv=(), scriptname=None, **kw): u"""%(scriptname)s: Afficher un rapport sur les tâches USAGE %(scriptname)s OPTIONS Afficher une liste des projets en cours, et leur pourcentage, ainsi que les projets qui sont terminés (pour lesquels il n'y a pas de tâches.""" opts, argv = getopt(argv, self.CONFOPT + 'h', self.CONFLOPT + ['help']) for opt, value in opts: if self.is_global_option(opt, value): pass elif opt in ('-h', '--help'): uprint(self.REPORT.__doc__ % locals()) sys.exit(0) ############################################################################ def UPDATE(self, argv=(), scriptname=None, **kw): u"""%(scriptname)s: Forcer la mise à jour des fichiers de projet USAGE %(scriptname)s""" opts, argv = getopt(argv, self.CONFOPT + 'h', self.CONFLOPT + ['help']) for opt, value in opts: if self.is_global_option(opt, value): pass elif opt in ('-h', '--help'): uprint(self.UPDATE.__doc__ % locals()) sys.exit(0) update = [] for t in self.__tf(): update.append(t) self.update_projects(add_or_update=update) ################################################################################ if __name__ == '__main__': action = None argv = sys.argv[1:] tasksCLI = TasksCLI() # Essayer de determiner l'action avec le nom du script if scriptname in ('ta', 'ti', 'tt'): if scriptname == 'ti': argv[0:0] = ('-t', IDEA_TYPE) if scriptname == 'tt': argv.insert(0, '-d') action = 'add' elif scriptname in ('tl', 'tll', 'tla'): if scriptname == 'tll': argv.insert(0, '-l') elif scriptname == 'tla': argv.insert(0, '-a') action = 'list' elif scriptname in ('te', ): action = 'edit' elif scriptname in ('td', 'tci'): if scriptname == 'tci': argv.insert(0, '-C') action = 'do' if action is None: opts, argv = getopt(argv, TasksCLI.CONFOPT + 'h', TasksCLI.CONFLOPT + ['help']) for opt, value in opts: if opt in ('-h', '--help'): uprint(__doc__ % dict(scriptname=scriptname)) sys.exit(0) elif tasksCLI.is_global_option(opt, value): pass if not argv: uprint(__doc__ % dict(scriptname=scriptname)) sys.exit(0) action, argv = argv[0], argv[1:] if action in ('add', 'a', 'done'): if action == 'done': argv.insert(0, '-d') action = 'add' elif action in ('list', 'l', 'll', 'la'): if action == 'll': argv.insert(0, '-l') elif action == 'la': argv.insert(0, '-a') action = 'list' elif action in ('edit', 'e'): action = 'edit' elif action in ('do', 'd', 'commit', 'c'): if action in ('commit', 'c'): argv.insert(0, '-C') action = 'do' elif action in ('remove', 'r'): action = 'remove' elif action in ('purge', 'p'): action = 'purge' elif action in ('report', ): action = 'report' elif action in ('update', ): action = 'update' else: eerror("Action inconnue: %s" % action) sys.exit(1) if scriptname in ('Tasks.py', 't'): # pour l'affichage de l'aide scriptname = '%s %s' % (scriptname, action) apply(getattr(tasksCLI, action.upper()), (), {'argv': argv, 'scriptname': scriptname})