68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
# -*- coding: utf-8 mode: python -*- vim:sw=4:sts=4:et:ai:si:sta:fenc=utf-8
|
|
|
|
"""Gestion des verrous sur les fichiers.
|
|
"""
|
|
|
|
__all__ = ('cxopen', 'rdopen', 'wropen')
|
|
|
|
import os
|
|
from os import O_CREAT, O_EXCL, O_RDONLY, O_RDWR
|
|
|
|
from .uio import _s
|
|
|
|
def __check_mode(mode, *values, **kw):
|
|
for value in values:
|
|
if mode.startswith(value):
|
|
unless = kw.get("unless", None)
|
|
if unless is None or not mode.startswith(unless):
|
|
return
|
|
expected = values[0:1] and values[0] or u"?"
|
|
raise ValueError("mode should start with '%s', got '%s'" % (expected, mode))
|
|
|
|
def cxopen(file, mode="r+b", bufsize=-1, perms=0777):
|
|
u"""Créer atomiquement un fichier et l'ouvrir en lecture/écriture.
|
|
|
|
@raise OSError: si le fichier existe déjà.
|
|
"""
|
|
__check_mode(mode, 'w', 'a', 'r+')
|
|
fd = os.open(file, O_CREAT | O_EXCL | O_RDWR, perms)
|
|
return os.fdopen(fd, mode)
|
|
|
|
if os.name == 'posix':
|
|
import fcntl
|
|
from fcntl import LOCK_SH, LOCK_EX, LOCK_NB
|
|
|
|
def rdopen(file, mode="rb", bufsize=-1, nowait=False):
|
|
u"""Ouvrir un fichier pour la lecture en tentant d'acquérir un verrou
|
|
partagé pour sa lecture.
|
|
|
|
Le verrou est libéré lors de la fermeture du fichier.
|
|
"""
|
|
__check_mode(mode, 'r', unless='r+')
|
|
fd = os.open(file, O_RDONLY)
|
|
fcntl.flock(fd, LOCK_SH | (nowait and LOCK_NB or 0))
|
|
return os.fdopen(fd, mode, bufsize)
|
|
|
|
def wropen(file, mode="r+b", bufsize=-1, perms=0777, nowait=False):
|
|
u"""Ouvrir un fichier pour la lecture/écriture en tentant d'acquérir
|
|
un verrou exclusif pour son écriture.
|
|
|
|
Le verrou est libéré lors de la fermeture du fichier.
|
|
"""
|
|
__check_mode(mode, 'w', 'a', 'r+')
|
|
fd = os.open(file, O_CREAT | O_RDWR, perms)
|
|
fcntl.flock(fd, LOCK_EX | (nowait and LOCK_NB or 0))
|
|
return os.fdopen(fd, mode, bufsize)
|
|
|
|
else:
|
|
# pas de verrouillage sur les plateformes non supportées
|
|
def rdopen(file, mode="rb", bufsize=-1, nowait=False):
|
|
__check_mode(mode, 'r', unless='r+')
|
|
fd = os.open(file, O_RDONLY)
|
|
return os.fdopen(fd, mode, bufsize)
|
|
|
|
def wropen(file, mode="r+b", bufsize=-1, perms=0777, nowait=False):
|
|
__check_mode(mode, 'w', 'a', 'r+')
|
|
fd = os.open(file, O_CREAT | O_RDWR, perms)
|
|
return os.fdopen(fd, mode, bufsize)
|