Coverage for /usr/local/lib/python3.7/site-packages/_pytest/pathlib.py : 6%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import atexit
2import fnmatch
3import itertools
4import os
5import shutil
6import sys
7import uuid
8import warnings
9from functools import partial
10from os.path import expanduser
11from os.path import expandvars
12from os.path import isabs
13from os.path import sep
14from posixpath import sep as posix_sep
15from typing import Iterable
16from typing import Iterator
17from typing import Set
18from typing import TypeVar
19from typing import Union
21from _pytest.warning_types import PytestWarning
23if sys.version_info[:2] >= (3, 6):
24 from pathlib import Path, PurePath
25else:
26 from pathlib2 import Path, PurePath
28__all__ = ["Path", "PurePath"]
31LOCK_TIMEOUT = 60 * 60 * 3
34_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath)
37def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
38 return path.joinpath(".lock")
41def ensure_reset_dir(path: Path) -> None:
42 """
43 ensures the given path is an empty directory
44 """
45 if path.exists():
46 rm_rf(path)
47 path.mkdir()
50def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool:
51 """Handles known read-only errors during rmtree.
53 The returned value is used only by our own tests.
54 """
55 exctype, excvalue = exc[:2]
57 # another process removed the file in the middle of the "rm_rf" (xdist for example)
58 # more context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018
59 if isinstance(excvalue, FileNotFoundError):
60 return False
62 if not isinstance(excvalue, PermissionError):
63 warnings.warn(
64 PytestWarning(
65 "(rm_rf) error removing {}\n{}: {}".format(path, exctype, excvalue)
66 )
67 )
68 return False
70 if func not in (os.rmdir, os.remove, os.unlink):
71 if func not in (os.open,):
72 warnings.warn(
73 PytestWarning(
74 "(rm_rf) unknown function {} when removing {}:\n{}: {}".format(
75 func, path, exctype, excvalue
76 )
77 )
78 )
79 return False
81 # Chmod + retry.
82 import stat
84 def chmod_rw(p: str) -> None:
85 mode = os.stat(p).st_mode
86 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
88 # For files, we need to recursively go upwards in the directories to
89 # ensure they all are also writable.
90 p = Path(path)
91 if p.is_file():
92 for parent in p.parents:
93 chmod_rw(str(parent))
94 # stop when we reach the original path passed to rm_rf
95 if parent == start_path:
96 break
97 chmod_rw(str(path))
99 func(path)
100 return True
103def rm_rf(path: Path) -> None:
104 """Remove the path contents recursively, even if some elements
105 are read-only.
106 """
107 onerror = partial(on_rm_rf_error, start_path=path)
108 shutil.rmtree(str(path), onerror=onerror)
111def find_prefixed(root: Path, prefix: str) -> Iterator[Path]:
112 """finds all elements in root that begin with the prefix, case insensitive"""
113 l_prefix = prefix.lower()
114 for x in root.iterdir():
115 if x.name.lower().startswith(l_prefix):
116 yield x
119def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]:
120 """
121 :param iter: iterator over path names
122 :param prefix: expected prefix of the path names
123 :returns: the parts of the paths following the prefix
124 """
125 p_len = len(prefix)
126 for p in iter:
127 yield p.name[p_len:]
130def find_suffixes(root: Path, prefix: str) -> Iterator[str]:
131 """combines find_prefixes and extract_suffixes
132 """
133 return extract_suffixes(find_prefixed(root, prefix), prefix)
136def parse_num(maybe_num) -> int:
137 """parses number path suffixes, returns -1 on error"""
138 try:
139 return int(maybe_num)
140 except ValueError:
141 return -1
144def _force_symlink(
145 root: Path, target: Union[str, PurePath], link_to: Union[str, Path]
146) -> None:
147 """helper to create the current symlink
149 it's full of race conditions that are reasonably ok to ignore
150 for the context of best effort linking to the latest test run
152 the presumption being that in case of much parallelism
153 the inaccuracy is going to be acceptable
154 """
155 current_symlink = root.joinpath(target)
156 try:
157 current_symlink.unlink()
158 except OSError:
159 pass
160 try:
161 current_symlink.symlink_to(link_to)
162 except Exception:
163 pass
166def make_numbered_dir(root: Path, prefix: str) -> Path:
167 """create a directory with an increased number as suffix for the given prefix"""
168 for i in range(10):
169 # try up to 10 times to create the folder
170 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
171 new_number = max_existing + 1
172 new_path = root.joinpath("{}{}".format(prefix, new_number))
173 try:
174 new_path.mkdir()
175 except Exception:
176 pass
177 else:
178 _force_symlink(root, prefix + "current", new_path)
179 return new_path
180 else:
181 raise EnvironmentError(
182 "could not create numbered dir with prefix "
183 "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
184 )
187def create_cleanup_lock(p: Path) -> Path:
188 """crates a lock to prevent premature folder cleanup"""
189 lock_path = get_lock_path(p)
190 try:
191 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
192 except FileExistsError as e:
193 raise EnvironmentError("cannot create lockfile in {path}".format(path=p)) from e
194 else:
195 pid = os.getpid()
196 spid = str(pid).encode()
197 os.write(fd, spid)
198 os.close(fd)
199 if not lock_path.is_file():
200 raise EnvironmentError("lock path got renamed after successful creation")
201 return lock_path
204def register_cleanup_lock_removal(lock_path: Path, register=atexit.register):
205 """registers a cleanup function for removing a lock, by default on atexit"""
206 pid = os.getpid()
208 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None:
209 current_pid = os.getpid()
210 if current_pid != original_pid:
211 # fork
212 return
213 try:
214 lock_path.unlink()
215 except (OSError, IOError):
216 pass
218 return register(cleanup_on_exit)
221def maybe_delete_a_numbered_dir(path: Path) -> None:
222 """removes a numbered directory if its lock can be obtained and it does not seem to be in use"""
223 lock_path = None
224 try:
225 lock_path = create_cleanup_lock(path)
226 parent = path.parent
228 garbage = parent.joinpath("garbage-{}".format(uuid.uuid4()))
229 path.rename(garbage)
230 rm_rf(garbage)
231 except (OSError, EnvironmentError):
232 # known races:
233 # * other process did a cleanup at the same time
234 # * deletable folder was found
235 # * process cwd (Windows)
236 return
237 finally:
238 # if we created the lock, ensure we remove it even if we failed
239 # to properly remove the numbered dir
240 if lock_path is not None:
241 try:
242 lock_path.unlink()
243 except (OSError, IOError):
244 pass
247def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool:
248 """checks if a lock exists and breaks it if its considered dead"""
249 if path.is_symlink():
250 return False
251 lock = get_lock_path(path)
252 if not lock.exists():
253 return True
254 try:
255 lock_time = lock.stat().st_mtime
256 except Exception:
257 return False
258 else:
259 if lock_time < consider_lock_dead_if_created_before:
260 lock.unlink()
261 return True
262 else:
263 return False
266def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None:
267 """tries to cleanup a folder if we can ensure it's deletable"""
268 if ensure_deletable(path, consider_lock_dead_if_created_before):
269 maybe_delete_a_numbered_dir(path)
272def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]:
273 """lists candidates for numbered directories to be removed - follows py.path"""
274 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
275 max_delete = max_existing - keep
276 paths = find_prefixed(root, prefix)
277 paths, paths2 = itertools.tee(paths)
278 numbers = map(parse_num, extract_suffixes(paths2, prefix))
279 for path, number in zip(paths, numbers):
280 if number <= max_delete:
281 yield path
284def cleanup_numbered_dir(
285 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float
286) -> None:
287 """cleanup for lock driven numbered directories"""
288 for path in cleanup_candidates(root, prefix, keep):
289 try_cleanup(path, consider_lock_dead_if_created_before)
290 for path in root.glob("garbage-*"):
291 try_cleanup(path, consider_lock_dead_if_created_before)
294def make_numbered_dir_with_cleanup(
295 root: Path, prefix: str, keep: int, lock_timeout: float
296) -> Path:
297 """creates a numbered dir with a cleanup lock and removes old ones"""
298 e = None
299 for i in range(10):
300 try:
301 p = make_numbered_dir(root, prefix)
302 lock_path = create_cleanup_lock(p)
303 register_cleanup_lock_removal(lock_path)
304 except Exception as exc:
305 e = exc
306 else:
307 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
308 cleanup_numbered_dir(
309 root=root,
310 prefix=prefix,
311 keep=keep,
312 consider_lock_dead_if_created_before=consider_lock_dead_if_created_before,
313 )
314 return p
315 assert e is not None
316 raise e
319def resolve_from_str(input, root):
320 assert not isinstance(input, Path), "would break on py2"
321 root = Path(root)
322 input = expanduser(input)
323 input = expandvars(input)
324 if isabs(input):
325 return Path(input)
326 else:
327 return root.joinpath(input)
330def fnmatch_ex(pattern: str, path) -> bool:
331 """FNMatcher port from py.path.common which works with PurePath() instances.
333 The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
334 for each part of the path, while this algorithm uses the whole path instead.
336 For example:
337 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with
338 PurePath.match().
340 This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according
341 this logic.
343 References:
344 * https://bugs.python.org/issue29249
345 * https://bugs.python.org/issue34731
346 """
347 path = PurePath(path)
348 iswin32 = sys.platform.startswith("win")
350 if iswin32 and sep not in pattern and posix_sep in pattern:
351 # Running on Windows, the pattern has no Windows path separators,
352 # and the pattern has one or more Posix path separators. Replace
353 # the Posix path separators with the Windows path separator.
354 pattern = pattern.replace(posix_sep, sep)
356 if sep not in pattern:
357 name = path.name
358 else:
359 name = str(path)
360 if path.is_absolute() and not os.path.isabs(pattern):
361 pattern = "*{}{}".format(os.sep, pattern)
362 return fnmatch.fnmatch(name, pattern)
365def parts(s: str) -> Set[str]:
366 parts = s.split(sep)
367 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}