Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import atexit 

2import fnmatch 

3import itertools 

4import os 

5import shutil 

6import sys 

7import uuid 

8import warnings 

9from functools import partial 

10from os.path import expanduser 

11from os.path import expandvars 

12from os.path import isabs 

13from os.path import sep 

14from posixpath import sep as posix_sep 

15from typing import Iterable 

16from typing import Iterator 

17from typing import Set 

18from typing import TypeVar 

19from typing import Union 

20 

21from _pytest.warning_types import PytestWarning 

22 

23if sys.version_info[:2] >= (3, 6): 

24 from pathlib import Path, PurePath 

25else: 

26 from pathlib2 import Path, PurePath 

27 

28__all__ = ["Path", "PurePath"] 

29 

30 

31LOCK_TIMEOUT = 60 * 60 * 3 

32 

33 

34_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) 

35 

36 

37def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: 

38 return path.joinpath(".lock") 

39 

40 

41def ensure_reset_dir(path: Path) -> None: 

42 """ 

43 ensures the given path is an empty directory 

44 """ 

45 if path.exists(): 

46 rm_rf(path) 

47 path.mkdir() 

48 

49 

50def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool: 

51 """Handles known read-only errors during rmtree. 

52 

53 The returned value is used only by our own tests. 

54 """ 

55 exctype, excvalue = exc[:2] 

56 

57 # another process removed the file in the middle of the "rm_rf" (xdist for example) 

58 # more context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 

59 if isinstance(excvalue, FileNotFoundError): 

60 return False 

61 

62 if not isinstance(excvalue, PermissionError): 

63 warnings.warn( 

64 PytestWarning( 

65 "(rm_rf) error removing {}\n{}: {}".format(path, exctype, excvalue) 

66 ) 

67 ) 

68 return False 

69 

70 if func not in (os.rmdir, os.remove, os.unlink): 

71 if func not in (os.open,): 

72 warnings.warn( 

73 PytestWarning( 

74 "(rm_rf) unknown function {} when removing {}:\n{}: {}".format( 

75 func, path, exctype, excvalue 

76 ) 

77 ) 

78 ) 

79 return False 

80 

81 # Chmod + retry. 

82 import stat 

83 

84 def chmod_rw(p: str) -> None: 

85 mode = os.stat(p).st_mode 

86 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) 

87 

88 # For files, we need to recursively go upwards in the directories to 

89 # ensure they all are also writable. 

90 p = Path(path) 

91 if p.is_file(): 

92 for parent in p.parents: 

93 chmod_rw(str(parent)) 

94 # stop when we reach the original path passed to rm_rf 

95 if parent == start_path: 

96 break 

97 chmod_rw(str(path)) 

98 

99 func(path) 

100 return True 

101 

102 

103def rm_rf(path: Path) -> None: 

104 """Remove the path contents recursively, even if some elements 

105 are read-only. 

106 """ 

107 onerror = partial(on_rm_rf_error, start_path=path) 

108 shutil.rmtree(str(path), onerror=onerror) 

109 

110 

111def find_prefixed(root: Path, prefix: str) -> Iterator[Path]: 

112 """finds all elements in root that begin with the prefix, case insensitive""" 

113 l_prefix = prefix.lower() 

114 for x in root.iterdir(): 

115 if x.name.lower().startswith(l_prefix): 

116 yield x 

117 

118 

119def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]: 

120 """ 

121 :param iter: iterator over path names 

122 :param prefix: expected prefix of the path names 

123 :returns: the parts of the paths following the prefix 

124 """ 

125 p_len = len(prefix) 

126 for p in iter: 

127 yield p.name[p_len:] 

128 

129 

130def find_suffixes(root: Path, prefix: str) -> Iterator[str]: 

131 """combines find_prefixes and extract_suffixes 

132 """ 

133 return extract_suffixes(find_prefixed(root, prefix), prefix) 

134 

135 

136def parse_num(maybe_num) -> int: 

137 """parses number path suffixes, returns -1 on error""" 

138 try: 

139 return int(maybe_num) 

140 except ValueError: 

141 return -1 

142 

143 

144def _force_symlink( 

145 root: Path, target: Union[str, PurePath], link_to: Union[str, Path] 

146) -> None: 

147 """helper to create the current symlink 

148 

149 it's full of race conditions that are reasonably ok to ignore 

150 for the context of best effort linking to the latest test run 

151 

152 the presumption being that in case of much parallelism 

153 the inaccuracy is going to be acceptable 

154 """ 

155 current_symlink = root.joinpath(target) 

156 try: 

157 current_symlink.unlink() 

158 except OSError: 

159 pass 

160 try: 

161 current_symlink.symlink_to(link_to) 

162 except Exception: 

163 pass 

164 

165 

166def make_numbered_dir(root: Path, prefix: str) -> Path: 

167 """create a directory with an increased number as suffix for the given prefix""" 

168 for i in range(10): 

169 # try up to 10 times to create the folder 

170 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

171 new_number = max_existing + 1 

172 new_path = root.joinpath("{}{}".format(prefix, new_number)) 

173 try: 

174 new_path.mkdir() 

175 except Exception: 

176 pass 

177 else: 

178 _force_symlink(root, prefix + "current", new_path) 

179 return new_path 

180 else: 

181 raise EnvironmentError( 

182 "could not create numbered dir with prefix " 

183 "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root) 

184 ) 

185 

186 

187def create_cleanup_lock(p: Path) -> Path: 

188 """crates a lock to prevent premature folder cleanup""" 

189 lock_path = get_lock_path(p) 

190 try: 

191 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) 

192 except FileExistsError as e: 

193 raise EnvironmentError("cannot create lockfile in {path}".format(path=p)) from e 

194 else: 

195 pid = os.getpid() 

196 spid = str(pid).encode() 

197 os.write(fd, spid) 

198 os.close(fd) 

199 if not lock_path.is_file(): 

200 raise EnvironmentError("lock path got renamed after successful creation") 

201 return lock_path 

202 

203 

204def register_cleanup_lock_removal(lock_path: Path, register=atexit.register): 

205 """registers a cleanup function for removing a lock, by default on atexit""" 

206 pid = os.getpid() 

207 

208 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: 

209 current_pid = os.getpid() 

210 if current_pid != original_pid: 

211 # fork 

212 return 

213 try: 

214 lock_path.unlink() 

215 except (OSError, IOError): 

216 pass 

217 

218 return register(cleanup_on_exit) 

219 

220 

221def maybe_delete_a_numbered_dir(path: Path) -> None: 

222 """removes a numbered directory if its lock can be obtained and it does not seem to be in use""" 

223 lock_path = None 

224 try: 

225 lock_path = create_cleanup_lock(path) 

226 parent = path.parent 

227 

228 garbage = parent.joinpath("garbage-{}".format(uuid.uuid4())) 

229 path.rename(garbage) 

230 rm_rf(garbage) 

231 except (OSError, EnvironmentError): 

232 # known races: 

233 # * other process did a cleanup at the same time 

234 # * deletable folder was found 

235 # * process cwd (Windows) 

236 return 

237 finally: 

238 # if we created the lock, ensure we remove it even if we failed 

239 # to properly remove the numbered dir 

240 if lock_path is not None: 

241 try: 

242 lock_path.unlink() 

243 except (OSError, IOError): 

244 pass 

245 

246 

247def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: 

248 """checks if a lock exists and breaks it if its considered dead""" 

249 if path.is_symlink(): 

250 return False 

251 lock = get_lock_path(path) 

252 if not lock.exists(): 

253 return True 

254 try: 

255 lock_time = lock.stat().st_mtime 

256 except Exception: 

257 return False 

258 else: 

259 if lock_time < consider_lock_dead_if_created_before: 

260 lock.unlink() 

261 return True 

262 else: 

263 return False 

264 

265 

266def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: 

267 """tries to cleanup a folder if we can ensure it's deletable""" 

268 if ensure_deletable(path, consider_lock_dead_if_created_before): 

269 maybe_delete_a_numbered_dir(path) 

270 

271 

272def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: 

273 """lists candidates for numbered directories to be removed - follows py.path""" 

274 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

275 max_delete = max_existing - keep 

276 paths = find_prefixed(root, prefix) 

277 paths, paths2 = itertools.tee(paths) 

278 numbers = map(parse_num, extract_suffixes(paths2, prefix)) 

279 for path, number in zip(paths, numbers): 

280 if number <= max_delete: 

281 yield path 

282 

283 

284def cleanup_numbered_dir( 

285 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float 

286) -> None: 

287 """cleanup for lock driven numbered directories""" 

288 for path in cleanup_candidates(root, prefix, keep): 

289 try_cleanup(path, consider_lock_dead_if_created_before) 

290 for path in root.glob("garbage-*"): 

291 try_cleanup(path, consider_lock_dead_if_created_before) 

292 

293 

294def make_numbered_dir_with_cleanup( 

295 root: Path, prefix: str, keep: int, lock_timeout: float 

296) -> Path: 

297 """creates a numbered dir with a cleanup lock and removes old ones""" 

298 e = None 

299 for i in range(10): 

300 try: 

301 p = make_numbered_dir(root, prefix) 

302 lock_path = create_cleanup_lock(p) 

303 register_cleanup_lock_removal(lock_path) 

304 except Exception as exc: 

305 e = exc 

306 else: 

307 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout 

308 cleanup_numbered_dir( 

309 root=root, 

310 prefix=prefix, 

311 keep=keep, 

312 consider_lock_dead_if_created_before=consider_lock_dead_if_created_before, 

313 ) 

314 return p 

315 assert e is not None 

316 raise e 

317 

318 

319def resolve_from_str(input, root): 

320 assert not isinstance(input, Path), "would break on py2" 

321 root = Path(root) 

322 input = expanduser(input) 

323 input = expandvars(input) 

324 if isabs(input): 

325 return Path(input) 

326 else: 

327 return root.joinpath(input) 

328 

329 

330def fnmatch_ex(pattern: str, path) -> bool: 

331 """FNMatcher port from py.path.common which works with PurePath() instances. 

332 

333 The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions 

334 for each part of the path, while this algorithm uses the whole path instead. 

335 

336 For example: 

337 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with 

338 PurePath.match(). 

339 

340 This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according 

341 this logic. 

342 

343 References: 

344 * https://bugs.python.org/issue29249 

345 * https://bugs.python.org/issue34731 

346 """ 

347 path = PurePath(path) 

348 iswin32 = sys.platform.startswith("win") 

349 

350 if iswin32 and sep not in pattern and posix_sep in pattern: 

351 # Running on Windows, the pattern has no Windows path separators, 

352 # and the pattern has one or more Posix path separators. Replace 

353 # the Posix path separators with the Windows path separator. 

354 pattern = pattern.replace(posix_sep, sep) 

355 

356 if sep not in pattern: 

357 name = path.name 

358 else: 

359 name = str(path) 

360 if path.is_absolute() and not os.path.isabs(pattern): 

361 pattern = "*{}{}".format(os.sep, pattern) 

362 return fnmatch.fnmatch(name, pattern) 

363 

364 

365def parts(s: str) -> Set[str]: 

366 parts = s.split(sep) 

367 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}