Coverage for /usr/local/lib/python3.7/site-packages/py/_path/local.py : 17%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2local path implementation.
3"""
4from __future__ import with_statement
6from contextlib import contextmanager
7import sys, os, atexit, io, uuid
8import py
9from py._path import common
10from py._path.common import iswin32, fspath
11from stat import S_ISLNK, S_ISDIR, S_ISREG
13from os.path import abspath, normpath, isabs, exists, isdir, isfile, islink, dirname
15if sys.version_info > (3,0):
16 def map_as_list(func, iter):
17 return list(map(func, iter))
18else:
19 map_as_list = map
21ALLOW_IMPORTLIB_MODE = sys.version_info > (3,5)
22if ALLOW_IMPORTLIB_MODE:
23 import importlib
26class Stat(object):
27 def __getattr__(self, name):
28 return getattr(self._osstatresult, "st_" + name)
30 def __init__(self, path, osstatresult):
31 self.path = path
32 self._osstatresult = osstatresult
34 @property
35 def owner(self):
36 if iswin32:
37 raise NotImplementedError("XXX win32")
38 import pwd
39 entry = py.error.checked_call(pwd.getpwuid, self.uid)
40 return entry[0]
42 @property
43 def group(self):
44 """ return group name of file. """
45 if iswin32:
46 raise NotImplementedError("XXX win32")
47 import grp
48 entry = py.error.checked_call(grp.getgrgid, self.gid)
49 return entry[0]
51 def isdir(self):
52 return S_ISDIR(self._osstatresult.st_mode)
54 def isfile(self):
55 return S_ISREG(self._osstatresult.st_mode)
57 def islink(self):
58 st = self.path.lstat()
59 return S_ISLNK(self._osstatresult.st_mode)
61class PosixPath(common.PathBase):
62 def chown(self, user, group, rec=0):
63 """ change ownership to the given user and group.
64 user and group may be specified by a number or
65 by a name. if rec is True change ownership
66 recursively.
67 """
68 uid = getuserid(user)
69 gid = getgroupid(group)
70 if rec:
71 for x in self.visit(rec=lambda x: x.check(link=0)):
72 if x.check(link=0):
73 py.error.checked_call(os.chown, str(x), uid, gid)
74 py.error.checked_call(os.chown, str(self), uid, gid)
76 def readlink(self):
77 """ return value of a symbolic link. """
78 return py.error.checked_call(os.readlink, self.strpath)
80 def mklinkto(self, oldname):
81 """ posix style hard link to another name. """
82 py.error.checked_call(os.link, str(oldname), str(self))
84 def mksymlinkto(self, value, absolute=1):
85 """ create a symbolic link with the given value (pointing to another name). """
86 if absolute:
87 py.error.checked_call(os.symlink, str(value), self.strpath)
88 else:
89 base = self.common(value)
90 # with posix local paths '/' is always a common base
91 relsource = self.__class__(value).relto(base)
92 reldest = self.relto(base)
93 n = reldest.count(self.sep)
94 target = self.sep.join(('..', )*n + (relsource, ))
95 py.error.checked_call(os.symlink, target, self.strpath)
97def getuserid(user):
98 import pwd
99 if not isinstance(user, int):
100 user = pwd.getpwnam(user)[2]
101 return user
103def getgroupid(group):
104 import grp
105 if not isinstance(group, int):
106 group = grp.getgrnam(group)[2]
107 return group
109FSBase = not iswin32 and PosixPath or common.PathBase
111class LocalPath(FSBase):
112 """ object oriented interface to os.path and other local filesystem
113 related information.
114 """
115 class ImportMismatchError(ImportError):
116 """ raised on pyimport() if there is a mismatch of __file__'s"""
118 sep = os.sep
119 class Checkers(common.Checkers):
120 def _stat(self):
121 try:
122 return self._statcache
123 except AttributeError:
124 try:
125 self._statcache = self.path.stat()
126 except py.error.ELOOP:
127 self._statcache = self.path.lstat()
128 return self._statcache
130 def dir(self):
131 return S_ISDIR(self._stat().mode)
133 def file(self):
134 return S_ISREG(self._stat().mode)
136 def exists(self):
137 return self._stat()
139 def link(self):
140 st = self.path.lstat()
141 return S_ISLNK(st.mode)
143 def __init__(self, path=None, expanduser=False):
144 """ Initialize and return a local Path instance.
146 Path can be relative to the current directory.
147 If path is None it defaults to the current working directory.
148 If expanduser is True, tilde-expansion is performed.
149 Note that Path instances always carry an absolute path.
150 Note also that passing in a local path object will simply return
151 the exact same path object. Use new() to get a new copy.
152 """
153 if path is None:
154 self.strpath = py.error.checked_call(os.getcwd)
155 else:
156 try:
157 path = fspath(path)
158 except TypeError:
159 raise ValueError("can only pass None, Path instances "
160 "or non-empty strings to LocalPath")
161 if expanduser:
162 path = os.path.expanduser(path)
163 self.strpath = abspath(path)
165 def __hash__(self):
166 return hash(self.strpath)
168 def __eq__(self, other):
169 s1 = fspath(self)
170 try:
171 s2 = fspath(other)
172 except TypeError:
173 return False
174 if iswin32:
175 s1 = s1.lower()
176 try:
177 s2 = s2.lower()
178 except AttributeError:
179 return False
180 return s1 == s2
182 def __ne__(self, other):
183 return not (self == other)
185 def __lt__(self, other):
186 return fspath(self) < fspath(other)
188 def __gt__(self, other):
189 return fspath(self) > fspath(other)
191 def samefile(self, other):
192 """ return True if 'other' references the same file as 'self'.
193 """
194 other = fspath(other)
195 if not isabs(other):
196 other = abspath(other)
197 if self == other:
198 return True
199 if iswin32:
200 return False # there is no samefile
201 return py.error.checked_call(
202 os.path.samefile, self.strpath, other)
204 def remove(self, rec=1, ignore_errors=False):
205 """ remove a file or directory (or a directory tree if rec=1).
206 if ignore_errors is True, errors while removing directories will
207 be ignored.
208 """
209 if self.check(dir=1, link=0):
210 if rec:
211 # force remove of readonly files on windows
212 if iswin32:
213 self.chmod(0o700, rec=1)
214 import shutil
215 py.error.checked_call(
216 shutil.rmtree, self.strpath,
217 ignore_errors=ignore_errors)
218 else:
219 py.error.checked_call(os.rmdir, self.strpath)
220 else:
221 if iswin32:
222 self.chmod(0o700)
223 py.error.checked_call(os.remove, self.strpath)
225 def computehash(self, hashtype="md5", chunksize=524288):
226 """ return hexdigest of hashvalue for this file. """
227 try:
228 try:
229 import hashlib as mod
230 except ImportError:
231 if hashtype == "sha1":
232 hashtype = "sha"
233 mod = __import__(hashtype)
234 hash = getattr(mod, hashtype)()
235 except (AttributeError, ImportError):
236 raise ValueError("Don't know how to compute %r hash" %(hashtype,))
237 f = self.open('rb')
238 try:
239 while 1:
240 buf = f.read(chunksize)
241 if not buf:
242 return hash.hexdigest()
243 hash.update(buf)
244 finally:
245 f.close()
247 def new(self, **kw):
248 """ create a modified version of this path.
249 the following keyword arguments modify various path parts::
251 a:/some/path/to/a/file.ext
252 xx drive
253 xxxxxxxxxxxxxxxxx dirname
254 xxxxxxxx basename
255 xxxx purebasename
256 xxx ext
257 """
258 obj = object.__new__(self.__class__)
259 if not kw:
260 obj.strpath = self.strpath
261 return obj
262 drive, dirname, basename, purebasename,ext = self._getbyspec(
263 "drive,dirname,basename,purebasename,ext")
264 if 'basename' in kw:
265 if 'purebasename' in kw or 'ext' in kw:
266 raise ValueError("invalid specification %r" % kw)
267 else:
268 pb = kw.setdefault('purebasename', purebasename)
269 try:
270 ext = kw['ext']
271 except KeyError:
272 pass
273 else:
274 if ext and not ext.startswith('.'):
275 ext = '.' + ext
276 kw['basename'] = pb + ext
278 if ('dirname' in kw and not kw['dirname']):
279 kw['dirname'] = drive
280 else:
281 kw.setdefault('dirname', dirname)
282 kw.setdefault('sep', self.sep)
283 obj.strpath = normpath(
284 "%(dirname)s%(sep)s%(basename)s" % kw)
285 return obj
287 def _getbyspec(self, spec):
288 """ see new for what 'spec' can be. """
289 res = []
290 parts = self.strpath.split(self.sep)
292 args = filter(None, spec.split(',') )
293 append = res.append
294 for name in args:
295 if name == 'drive':
296 append(parts[0])
297 elif name == 'dirname':
298 append(self.sep.join(parts[:-1]))
299 else:
300 basename = parts[-1]
301 if name == 'basename':
302 append(basename)
303 else:
304 i = basename.rfind('.')
305 if i == -1:
306 purebasename, ext = basename, ''
307 else:
308 purebasename, ext = basename[:i], basename[i:]
309 if name == 'purebasename':
310 append(purebasename)
311 elif name == 'ext':
312 append(ext)
313 else:
314 raise ValueError("invalid part specification %r" % name)
315 return res
317 def dirpath(self, *args, **kwargs):
318 """ return the directory path joined with any given path arguments. """
319 if not kwargs:
320 path = object.__new__(self.__class__)
321 path.strpath = dirname(self.strpath)
322 if args:
323 path = path.join(*args)
324 return path
325 return super(LocalPath, self).dirpath(*args, **kwargs)
327 def join(self, *args, **kwargs):
328 """ return a new path by appending all 'args' as path
329 components. if abs=1 is used restart from root if any
330 of the args is an absolute path.
331 """
332 sep = self.sep
333 strargs = [fspath(arg) for arg in args]
334 strpath = self.strpath
335 if kwargs.get('abs'):
336 newargs = []
337 for arg in reversed(strargs):
338 if isabs(arg):
339 strpath = arg
340 strargs = newargs
341 break
342 newargs.insert(0, arg)
343 # special case for when we have e.g. strpath == "/"
344 actual_sep = "" if strpath.endswith(sep) else sep
345 for arg in strargs:
346 arg = arg.strip(sep)
347 if iswin32:
348 # allow unix style paths even on windows.
349 arg = arg.strip('/')
350 arg = arg.replace('/', sep)
351 strpath = strpath + actual_sep + arg
352 actual_sep = sep
353 obj = object.__new__(self.__class__)
354 obj.strpath = normpath(strpath)
355 return obj
357 def open(self, mode='r', ensure=False, encoding=None):
358 """ return an opened file with the given mode.
360 If ensure is True, create parent directories if needed.
361 """
362 if ensure:
363 self.dirpath().ensure(dir=1)
364 if encoding:
365 return py.error.checked_call(io.open, self.strpath, mode, encoding=encoding)
366 return py.error.checked_call(open, self.strpath, mode)
368 def _fastjoin(self, name):
369 child = object.__new__(self.__class__)
370 child.strpath = self.strpath + self.sep + name
371 return child
373 def islink(self):
374 return islink(self.strpath)
376 def check(self, **kw):
377 if not kw:
378 return exists(self.strpath)
379 if len(kw) == 1:
380 if "dir" in kw:
381 return not kw["dir"] ^ isdir(self.strpath)
382 if "file" in kw:
383 return not kw["file"] ^ isfile(self.strpath)
384 return super(LocalPath, self).check(**kw)
386 _patternchars = set("*?[" + os.path.sep)
387 def listdir(self, fil=None, sort=None):
388 """ list directory contents, possibly filter by the given fil func
389 and possibly sorted.
390 """
391 if fil is None and sort is None:
392 names = py.error.checked_call(os.listdir, self.strpath)
393 return map_as_list(self._fastjoin, names)
394 if isinstance(fil, py.builtin._basestring):
395 if not self._patternchars.intersection(fil):
396 child = self._fastjoin(fil)
397 if exists(child.strpath):
398 return [child]
399 return []
400 fil = common.FNMatcher(fil)
401 names = py.error.checked_call(os.listdir, self.strpath)
402 res = []
403 for name in names:
404 child = self._fastjoin(name)
405 if fil is None or fil(child):
406 res.append(child)
407 self._sortlist(res, sort)
408 return res
410 def size(self):
411 """ return size of the underlying file object """
412 return self.stat().size
414 def mtime(self):
415 """ return last modification time of the path. """
416 return self.stat().mtime
418 def copy(self, target, mode=False, stat=False):
419 """ copy path to target.
421 If mode is True, will copy copy permission from path to target.
422 If stat is True, copy permission, last modification
423 time, last access time, and flags from path to target.
424 """
425 if self.check(file=1):
426 if target.check(dir=1):
427 target = target.join(self.basename)
428 assert self!=target
429 copychunked(self, target)
430 if mode:
431 copymode(self.strpath, target.strpath)
432 if stat:
433 copystat(self, target)
434 else:
435 def rec(p):
436 return p.check(link=0)
437 for x in self.visit(rec=rec):
438 relpath = x.relto(self)
439 newx = target.join(relpath)
440 newx.dirpath().ensure(dir=1)
441 if x.check(link=1):
442 newx.mksymlinkto(x.readlink())
443 continue
444 elif x.check(file=1):
445 copychunked(x, newx)
446 elif x.check(dir=1):
447 newx.ensure(dir=1)
448 if mode:
449 copymode(x.strpath, newx.strpath)
450 if stat:
451 copystat(x, newx)
453 def rename(self, target):
454 """ rename this path to target. """
455 target = fspath(target)
456 return py.error.checked_call(os.rename, self.strpath, target)
458 def dump(self, obj, bin=1):
459 """ pickle object into path location"""
460 f = self.open('wb')
461 import pickle
462 try:
463 py.error.checked_call(pickle.dump, obj, f, bin)
464 finally:
465 f.close()
467 def mkdir(self, *args):
468 """ create & return the directory joined with args. """
469 p = self.join(*args)
470 py.error.checked_call(os.mkdir, fspath(p))
471 return p
473 def write_binary(self, data, ensure=False):
474 """ write binary data into path. If ensure is True create
475 missing parent directories.
476 """
477 if ensure:
478 self.dirpath().ensure(dir=1)
479 with self.open('wb') as f:
480 f.write(data)
482 def write_text(self, data, encoding, ensure=False):
483 """ write text data into path using the specified encoding.
484 If ensure is True create missing parent directories.
485 """
486 if ensure:
487 self.dirpath().ensure(dir=1)
488 with self.open('w', encoding=encoding) as f:
489 f.write(data)
491 def write(self, data, mode='w', ensure=False):
492 """ write data into path. If ensure is True create
493 missing parent directories.
494 """
495 if ensure:
496 self.dirpath().ensure(dir=1)
497 if 'b' in mode:
498 if not py.builtin._isbytes(data):
499 raise ValueError("can only process bytes")
500 else:
501 if not py.builtin._istext(data):
502 if not py.builtin._isbytes(data):
503 data = str(data)
504 else:
505 data = py.builtin._totext(data, sys.getdefaultencoding())
506 f = self.open(mode)
507 try:
508 f.write(data)
509 finally:
510 f.close()
512 def _ensuredirs(self):
513 parent = self.dirpath()
514 if parent == self:
515 return self
516 if parent.check(dir=0):
517 parent._ensuredirs()
518 if self.check(dir=0):
519 try:
520 self.mkdir()
521 except py.error.EEXIST:
522 # race condition: file/dir created by another thread/process.
523 # complain if it is not a dir
524 if self.check(dir=0):
525 raise
526 return self
528 def ensure(self, *args, **kwargs):
529 """ ensure that an args-joined path exists (by default as
530 a file). if you specify a keyword argument 'dir=True'
531 then the path is forced to be a directory path.
532 """
533 p = self.join(*args)
534 if kwargs.get('dir', 0):
535 return p._ensuredirs()
536 else:
537 p.dirpath()._ensuredirs()
538 if not p.check(file=1):
539 p.open('w').close()
540 return p
542 def stat(self, raising=True):
543 """ Return an os.stat() tuple. """
544 if raising == True:
545 return Stat(self, py.error.checked_call(os.stat, self.strpath))
546 try:
547 return Stat(self, os.stat(self.strpath))
548 except KeyboardInterrupt:
549 raise
550 except Exception:
551 return None
553 def lstat(self):
554 """ Return an os.lstat() tuple. """
555 return Stat(self, py.error.checked_call(os.lstat, self.strpath))
557 def setmtime(self, mtime=None):
558 """ set modification time for the given path. if 'mtime' is None
559 (the default) then the file's mtime is set to current time.
561 Note that the resolution for 'mtime' is platform dependent.
562 """
563 if mtime is None:
564 return py.error.checked_call(os.utime, self.strpath, mtime)
565 try:
566 return py.error.checked_call(os.utime, self.strpath, (-1, mtime))
567 except py.error.EINVAL:
568 return py.error.checked_call(os.utime, self.strpath, (self.atime(), mtime))
570 def chdir(self):
571 """ change directory to self and return old current directory """
572 try:
573 old = self.__class__()
574 except py.error.ENOENT:
575 old = None
576 py.error.checked_call(os.chdir, self.strpath)
577 return old
580 @contextmanager
581 def as_cwd(self):
582 """
583 Return a context manager, which changes to the path's dir during the
584 managed "with" context.
585 On __enter__ it returns the old dir, which might be ``None``.
586 """
587 old = self.chdir()
588 try:
589 yield old
590 finally:
591 if old is not None:
592 old.chdir()
594 def realpath(self):
595 """ return a new path which contains no symbolic links."""
596 return self.__class__(os.path.realpath(self.strpath))
598 def atime(self):
599 """ return last access time of the path. """
600 return self.stat().atime
602 def __repr__(self):
603 return 'local(%r)' % self.strpath
605 def __str__(self):
606 """ return string representation of the Path. """
607 return self.strpath
609 def chmod(self, mode, rec=0):
610 """ change permissions to the given mode. If mode is an
611 integer it directly encodes the os-specific modes.
612 if rec is True perform recursively.
613 """
614 if not isinstance(mode, int):
615 raise TypeError("mode %r must be an integer" % (mode,))
616 if rec:
617 for x in self.visit(rec=rec):
618 py.error.checked_call(os.chmod, str(x), mode)
619 py.error.checked_call(os.chmod, self.strpath, mode)
621 def pypkgpath(self):
622 """ return the Python package path by looking for the last
623 directory upwards which still contains an __init__.py.
624 Return None if a pkgpath can not be determined.
625 """
626 pkgpath = None
627 for parent in self.parts(reverse=True):
628 if parent.isdir():
629 if not parent.join('__init__.py').exists():
630 break
631 if not isimportable(parent.basename):
632 break
633 pkgpath = parent
634 return pkgpath
636 def _ensuresyspath(self, ensuremode, path):
637 if ensuremode:
638 s = str(path)
639 if ensuremode == "append":
640 if s not in sys.path:
641 sys.path.append(s)
642 else:
643 if s != sys.path[0]:
644 sys.path.insert(0, s)
646 def pyimport(self, modname=None, ensuresyspath=True):
647 """ return path as an imported python module.
649 If modname is None, look for the containing package
650 and construct an according module name.
651 The module will be put/looked up in sys.modules.
652 if ensuresyspath is True then the root dir for importing
653 the file (taking __init__.py files into account) will
654 be prepended to sys.path if it isn't there already.
655 If ensuresyspath=="append" the root dir will be appended
656 if it isn't already contained in sys.path.
657 if ensuresyspath is False no modification of syspath happens.
659 Special value of ensuresyspath=="importlib" is intended
660 purely for using in pytest, it is capable only of importing
661 separate .py files outside packages, e.g. for test suite
662 without any __init__.py file. It effectively allows having
663 same-named test modules in different places and offers
664 mild opt-in via this option. Note that it works only in
665 recent versions of python.
666 """
667 if not self.check():
668 raise py.error.ENOENT(self)
670 if ensuresyspath == 'importlib':
671 if modname is None:
672 modname = self.purebasename
673 if not ALLOW_IMPORTLIB_MODE:
674 raise ImportError(
675 "Can't use importlib due to old version of Python")
676 spec = importlib.util.spec_from_file_location(
677 modname, str(self))
678 if spec is None:
679 raise ImportError(
680 "Can't find module %s at location %s" %
681 (modname, str(self))
682 )
683 mod = importlib.util.module_from_spec(spec)
684 spec.loader.exec_module(mod)
685 return mod
687 pkgpath = None
688 if modname is None:
689 pkgpath = self.pypkgpath()
690 if pkgpath is not None:
691 pkgroot = pkgpath.dirpath()
692 names = self.new(ext="").relto(pkgroot).split(self.sep)
693 if names[-1] == "__init__":
694 names.pop()
695 modname = ".".join(names)
696 else:
697 pkgroot = self.dirpath()
698 modname = self.purebasename
700 self._ensuresyspath(ensuresyspath, pkgroot)
701 __import__(modname)
702 mod = sys.modules[modname]
703 if self.basename == "__init__.py":
704 return mod # we don't check anything as we might
705 # be in a namespace package ... too icky to check
706 modfile = mod.__file__
707 if modfile[-4:] in ('.pyc', '.pyo'):
708 modfile = modfile[:-1]
709 elif modfile.endswith('$py.class'):
710 modfile = modfile[:-9] + '.py'
711 if modfile.endswith(os.path.sep + "__init__.py"):
712 if self.basename != "__init__.py":
713 modfile = modfile[:-12]
714 try:
715 issame = self.samefile(modfile)
716 except py.error.ENOENT:
717 issame = False
718 if not issame:
719 ignore = os.getenv('PY_IGNORE_IMPORTMISMATCH')
720 if ignore != '1':
721 raise self.ImportMismatchError(modname, modfile, self)
722 return mod
723 else:
724 try:
725 return sys.modules[modname]
726 except KeyError:
727 # we have a custom modname, do a pseudo-import
728 import types
729 mod = types.ModuleType(modname)
730 mod.__file__ = str(self)
731 sys.modules[modname] = mod
732 try:
733 py.builtin.execfile(str(self), mod.__dict__)
734 except:
735 del sys.modules[modname]
736 raise
737 return mod
739 def sysexec(self, *argv, **popen_opts):
740 """ return stdout text from executing a system child process,
741 where the 'self' path points to executable.
742 The process is directly invoked and not through a system shell.
743 """
744 from subprocess import Popen, PIPE
745 argv = map_as_list(str, argv)
746 popen_opts['stdout'] = popen_opts['stderr'] = PIPE
747 proc = Popen([str(self)] + argv, **popen_opts)
748 stdout, stderr = proc.communicate()
749 ret = proc.wait()
750 if py.builtin._isbytes(stdout):
751 stdout = py.builtin._totext(stdout, sys.getdefaultencoding())
752 if ret != 0:
753 if py.builtin._isbytes(stderr):
754 stderr = py.builtin._totext(stderr, sys.getdefaultencoding())
755 raise py.process.cmdexec.Error(ret, ret, str(self),
756 stdout, stderr,)
757 return stdout
759 def sysfind(cls, name, checker=None, paths=None):
760 """ return a path object found by looking at the systems
761 underlying PATH specification. If the checker is not None
762 it will be invoked to filter matching paths. If a binary
763 cannot be found, None is returned
764 Note: This is probably not working on plain win32 systems
765 but may work on cygwin.
766 """
767 if isabs(name):
768 p = py.path.local(name)
769 if p.check(file=1):
770 return p
771 else:
772 if paths is None:
773 if iswin32:
774 paths = os.environ['Path'].split(';')
775 if '' not in paths and '.' not in paths:
776 paths.append('.')
777 try:
778 systemroot = os.environ['SYSTEMROOT']
779 except KeyError:
780 pass
781 else:
782 paths = [path.replace('%SystemRoot%', systemroot)
783 for path in paths]
784 else:
785 paths = os.environ['PATH'].split(':')
786 tryadd = []
787 if iswin32:
788 tryadd += os.environ['PATHEXT'].split(os.pathsep)
789 tryadd.append("")
791 for x in paths:
792 for addext in tryadd:
793 p = py.path.local(x).join(name, abs=True) + addext
794 try:
795 if p.check(file=1):
796 if checker:
797 if not checker(p):
798 continue
799 return p
800 except py.error.EACCES:
801 pass
802 return None
803 sysfind = classmethod(sysfind)
805 def _gethomedir(cls):
806 try:
807 x = os.environ['HOME']
808 except KeyError:
809 try:
810 x = os.environ["HOMEDRIVE"] + os.environ['HOMEPATH']
811 except KeyError:
812 return None
813 return cls(x)
814 _gethomedir = classmethod(_gethomedir)
816 # """
817 # special class constructors for local filesystem paths
818 # """
819 @classmethod
820 def get_temproot(cls):
821 """ return the system's temporary directory
822 (where tempfiles are usually created in)
823 """
824 import tempfile
825 return py.path.local(tempfile.gettempdir())
827 @classmethod
828 def mkdtemp(cls, rootdir=None):
829 """ return a Path object pointing to a fresh new temporary directory
830 (which we created ourself).
831 """
832 import tempfile
833 if rootdir is None:
834 rootdir = cls.get_temproot()
835 return cls(py.error.checked_call(tempfile.mkdtemp, dir=str(rootdir)))
837 def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3,
838 lock_timeout=172800): # two days
839 """ return unique directory with a number greater than the current
840 maximum one. The number is assumed to start directly after prefix.
841 if keep is true directories with a number less than (maxnum-keep)
842 will be removed. If .lock files are used (lock_timeout non-zero),
843 algorithm is multi-process safe.
844 """
845 if rootdir is None:
846 rootdir = cls.get_temproot()
848 nprefix = prefix.lower()
849 def parse_num(path):
850 """ parse the number out of a path (if it matches the prefix) """
851 nbasename = path.basename.lower()
852 if nbasename.startswith(nprefix):
853 try:
854 return int(nbasename[len(nprefix):])
855 except ValueError:
856 pass
858 def create_lockfile(path):
859 """ exclusively create lockfile. Throws when failed """
860 mypid = os.getpid()
861 lockfile = path.join('.lock')
862 if hasattr(lockfile, 'mksymlinkto'):
863 lockfile.mksymlinkto(str(mypid))
864 else:
865 fd = py.error.checked_call(os.open, str(lockfile), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
866 with os.fdopen(fd, 'w') as f:
867 f.write(str(mypid))
868 return lockfile
870 def atexit_remove_lockfile(lockfile):
871 """ ensure lockfile is removed at process exit """
872 mypid = os.getpid()
873 def try_remove_lockfile():
874 # in a fork() situation, only the last process should
875 # remove the .lock, otherwise the other processes run the
876 # risk of seeing their temporary dir disappear. For now
877 # we remove the .lock in the parent only (i.e. we assume
878 # that the children finish before the parent).
879 if os.getpid() != mypid:
880 return
881 try:
882 lockfile.remove()
883 except py.error.Error:
884 pass
885 atexit.register(try_remove_lockfile)
887 # compute the maximum number currently in use with the prefix
888 lastmax = None
889 while True:
890 maxnum = -1
891 for path in rootdir.listdir():
892 num = parse_num(path)
893 if num is not None:
894 maxnum = max(maxnum, num)
896 # make the new directory
897 try:
898 udir = rootdir.mkdir(prefix + str(maxnum+1))
899 if lock_timeout:
900 lockfile = create_lockfile(udir)
901 atexit_remove_lockfile(lockfile)
902 except (py.error.EEXIST, py.error.ENOENT, py.error.EBUSY):
903 # race condition (1): another thread/process created the dir
904 # in the meantime - try again
905 # race condition (2): another thread/process spuriously acquired
906 # lock treating empty directory as candidate
907 # for removal - try again
908 # race condition (3): another thread/process tried to create the lock at
909 # the same time (happened in Python 3.3 on Windows)
910 # https://ci.appveyor.com/project/pytestbot/py/build/1.0.21/job/ffi85j4c0lqwsfwa
911 if lastmax == maxnum:
912 raise
913 lastmax = maxnum
914 continue
915 break
917 def get_mtime(path):
918 """ read file modification time """
919 try:
920 return path.lstat().mtime
921 except py.error.Error:
922 pass
924 garbage_prefix = prefix + 'garbage-'
926 def is_garbage(path):
927 """ check if path denotes directory scheduled for removal """
928 bn = path.basename
929 return bn.startswith(garbage_prefix)
931 # prune old directories
932 udir_time = get_mtime(udir)
933 if keep and udir_time:
934 for path in rootdir.listdir():
935 num = parse_num(path)
936 if num is not None and num <= (maxnum - keep):
937 try:
938 # try acquiring lock to remove directory as exclusive user
939 if lock_timeout:
940 create_lockfile(path)
941 except (py.error.EEXIST, py.error.ENOENT, py.error.EBUSY):
942 path_time = get_mtime(path)
943 if not path_time:
944 # assume directory doesn't exist now
945 continue
946 if abs(udir_time - path_time) < lock_timeout:
947 # assume directory with lockfile exists
948 # and lock timeout hasn't expired yet
949 continue
951 # path dir locked for exclusive use
952 # and scheduled for removal to avoid another thread/process
953 # treating it as a new directory or removal candidate
954 garbage_path = rootdir.join(garbage_prefix + str(uuid.uuid4()))
955 try:
956 path.rename(garbage_path)
957 garbage_path.remove(rec=1)
958 except KeyboardInterrupt:
959 raise
960 except: # this might be py.error.Error, WindowsError ...
961 pass
962 if is_garbage(path):
963 try:
964 path.remove(rec=1)
965 except KeyboardInterrupt:
966 raise
967 except: # this might be py.error.Error, WindowsError ...
968 pass
970 # make link...
971 try:
972 username = os.environ['USER'] #linux, et al
973 except KeyError:
974 try:
975 username = os.environ['USERNAME'] #windows
976 except KeyError:
977 username = 'current'
979 src = str(udir)
980 dest = src[:src.rfind('-')] + '-' + username
981 try:
982 os.unlink(dest)
983 except OSError:
984 pass
985 try:
986 os.symlink(src, dest)
987 except (OSError, AttributeError, NotImplementedError):
988 pass
990 return udir
991 make_numbered_dir = classmethod(make_numbered_dir)
994def copymode(src, dest):
995 """ copy permission from src to dst. """
996 import shutil
997 shutil.copymode(src, dest)
1000def copystat(src, dest):
1001 """ copy permission, last modification time,
1002 last access time, and flags from src to dst."""
1003 import shutil
1004 shutil.copystat(str(src), str(dest))
1007def copychunked(src, dest):
1008 chunksize = 524288 # half a meg of bytes
1009 fsrc = src.open('rb')
1010 try:
1011 fdest = dest.open('wb')
1012 try:
1013 while 1:
1014 buf = fsrc.read(chunksize)
1015 if not buf:
1016 break
1017 fdest.write(buf)
1018 finally:
1019 fdest.close()
1020 finally:
1021 fsrc.close()
1024def isimportable(name):
1025 if name and (name[0].isalpha() or name[0] == '_'):
1026 name = name.replace("_", '')
1027 return not name or name.isalnum()