Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" basic collect and runtest protocol implementations """ 

2import bdb 

3import os 

4import sys 

5from time import time 

6from typing import Callable 

7from typing import Dict 

8from typing import List 

9from typing import Optional 

10from typing import Tuple 

11 

12import attr 

13 

14from .reports import CollectErrorRepr 

15from .reports import CollectReport 

16from .reports import TestReport 

17from _pytest._code.code import ExceptionInfo 

18from _pytest._code.code import ExceptionRepr 

19from _pytest.compat import TYPE_CHECKING 

20from _pytest.nodes import Collector 

21from _pytest.nodes import Node 

22from _pytest.outcomes import Exit 

23from _pytest.outcomes import Skipped 

24from _pytest.outcomes import TEST_OUTCOME 

25 

26if TYPE_CHECKING: 

27 from typing import Type 

28 

29# 

30# pytest plugin hooks 

31 

32 

33def pytest_addoption(parser): 

34 group = parser.getgroup("terminal reporting", "reporting", after="general") 

35 group.addoption( 

36 "--durations", 

37 action="store", 

38 type=int, 

39 default=None, 

40 metavar="N", 

41 help="show N slowest setup/test durations (N=0 for all).", 

42 ), 

43 

44 

45def pytest_terminal_summary(terminalreporter): 

46 durations = terminalreporter.config.option.durations 

47 verbose = terminalreporter.config.getvalue("verbose") 

48 if durations is None: 

49 return 

50 tr = terminalreporter 

51 dlist = [] 

52 for replist in tr.stats.values(): 

53 for rep in replist: 

54 if hasattr(rep, "duration"): 

55 dlist.append(rep) 

56 if not dlist: 

57 return 

58 dlist.sort(key=lambda x: x.duration) 

59 dlist.reverse() 

60 if not durations: 

61 tr.write_sep("=", "slowest test durations") 

62 else: 

63 tr.write_sep("=", "slowest %s test durations" % durations) 

64 dlist = dlist[:durations] 

65 

66 for rep in dlist: 

67 if verbose < 2 and rep.duration < 0.005: 

68 tr.write_line("") 

69 tr.write_line("(0.00 durations hidden. Use -vv to show these durations.)") 

70 break 

71 tr.write_line("{:02.2f}s {:<8} {}".format(rep.duration, rep.when, rep.nodeid)) 

72 

73 

74def pytest_sessionstart(session): 

75 session._setupstate = SetupState() 

76 

77 

78def pytest_sessionfinish(session): 

79 session._setupstate.teardown_all() 

80 

81 

82def pytest_runtest_protocol(item, nextitem): 

83 item.ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location) 

84 runtestprotocol(item, nextitem=nextitem) 

85 item.ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location) 

86 return True 

87 

88 

89def runtestprotocol(item, log=True, nextitem=None): 

90 hasrequest = hasattr(item, "_request") 

91 if hasrequest and not item._request: 

92 item._initrequest() 

93 rep = call_and_report(item, "setup", log) 

94 reports = [rep] 

95 if rep.passed: 

96 if item.config.getoption("setupshow", False): 

97 show_test_item(item) 

98 if not item.config.getoption("setuponly", False): 

99 reports.append(call_and_report(item, "call", log)) 

100 reports.append(call_and_report(item, "teardown", log, nextitem=nextitem)) 

101 # after all teardown hooks have been called 

102 # want funcargs and request info to go away 

103 if hasrequest: 

104 item._request = False 

105 item.funcargs = None 

106 return reports 

107 

108 

109def show_test_item(item): 

110 """Show test function, parameters and the fixtures of the test item.""" 

111 tw = item.config.get_terminal_writer() 

112 tw.line() 

113 tw.write(" " * 8) 

114 tw.write(item.nodeid) 

115 used_fixtures = sorted(getattr(item, "fixturenames", [])) 

116 if used_fixtures: 

117 tw.write(" (fixtures used: {})".format(", ".join(used_fixtures))) 

118 

119 

120def pytest_runtest_setup(item): 

121 _update_current_test_var(item, "setup") 

122 item.session._setupstate.prepare(item) 

123 

124 

125def pytest_runtest_call(item): 

126 _update_current_test_var(item, "call") 

127 try: 

128 del sys.last_type 

129 del sys.last_value 

130 del sys.last_traceback 

131 except AttributeError: 

132 pass 

133 try: 

134 item.runtest() 

135 except Exception: 

136 # Store trace info to allow postmortem debugging 

137 type, value, tb = sys.exc_info() 

138 assert tb is not None 

139 tb = tb.tb_next # Skip *this* frame 

140 sys.last_type = type 

141 sys.last_value = value 

142 sys.last_traceback = tb 

143 del type, value, tb # Get rid of these in this frame 

144 raise 

145 

146 

147def pytest_runtest_teardown(item, nextitem): 

148 _update_current_test_var(item, "teardown") 

149 item.session._setupstate.teardown_exact(item, nextitem) 

150 _update_current_test_var(item, None) 

151 

152 

153def _update_current_test_var(item, when): 

154 """ 

155 Update PYTEST_CURRENT_TEST to reflect the current item and stage. 

156 

157 If ``when`` is None, delete PYTEST_CURRENT_TEST from the environment. 

158 """ 

159 var_name = "PYTEST_CURRENT_TEST" 

160 if when: 

161 value = "{} ({})".format(item.nodeid, when) 

162 # don't allow null bytes on environment variables (see #2644, #2957) 

163 value = value.replace("\x00", "(null)") 

164 os.environ[var_name] = value 

165 else: 

166 os.environ.pop(var_name) 

167 

168 

169def pytest_report_teststatus(report): 

170 if report.when in ("setup", "teardown"): 

171 if report.failed: 

172 # category, shortletter, verbose-word 

173 return "error", "E", "ERROR" 

174 elif report.skipped: 

175 return "skipped", "s", "SKIPPED" 

176 else: 

177 return "", "", "" 

178 

179 

180# 

181# Implementation 

182 

183 

184def call_and_report(item, when, log=True, **kwds): 

185 call = call_runtest_hook(item, when, **kwds) 

186 hook = item.ihook 

187 report = hook.pytest_runtest_makereport(item=item, call=call) 

188 if log: 

189 hook.pytest_runtest_logreport(report=report) 

190 if check_interactive_exception(call, report): 

191 hook.pytest_exception_interact(node=item, call=call, report=report) 

192 return report 

193 

194 

195def check_interactive_exception(call, report): 

196 return call.excinfo and not ( 

197 hasattr(report, "wasxfail") 

198 or call.excinfo.errisinstance(Skipped) 

199 or call.excinfo.errisinstance(bdb.BdbQuit) 

200 ) 

201 

202 

203def call_runtest_hook(item, when, **kwds): 

204 hookname = "pytest_runtest_" + when 

205 ihook = getattr(item.ihook, hookname) 

206 reraise = (Exit,) # type: Tuple[Type[BaseException], ...] 

207 if not item.config.getoption("usepdb", False): 

208 reraise += (KeyboardInterrupt,) 

209 return CallInfo.from_call( 

210 lambda: ihook(item=item, **kwds), when=when, reraise=reraise 

211 ) 

212 

213 

214@attr.s(repr=False) 

215class CallInfo: 

216 """ Result/Exception info a function invocation. """ 

217 

218 _result = attr.ib() 

219 excinfo = attr.ib(type=Optional[ExceptionInfo]) 

220 start = attr.ib() 

221 stop = attr.ib() 

222 when = attr.ib() 

223 

224 @property 

225 def result(self): 

226 if self.excinfo is not None: 

227 raise AttributeError("{!r} has no valid result".format(self)) 

228 return self._result 

229 

230 @classmethod 

231 def from_call(cls, func, when, reraise=None) -> "CallInfo": 

232 #: context of invocation: one of "setup", "call", 

233 #: "teardown", "memocollect" 

234 start = time() 

235 excinfo = None 

236 try: 

237 result = func() 

238 except: # noqa 

239 excinfo = ExceptionInfo.from_current() 

240 if reraise is not None and excinfo.errisinstance(reraise): 

241 raise 

242 result = None 

243 stop = time() 

244 return cls(start=start, stop=stop, when=when, result=result, excinfo=excinfo) 

245 

246 def __repr__(self): 

247 if self.excinfo is None: 

248 return "<CallInfo when={!r} result: {!r}>".format(self.when, self._result) 

249 return "<CallInfo when={!r} excinfo={!r}>".format(self.when, self.excinfo) 

250 

251 

252def pytest_runtest_makereport(item, call): 

253 return TestReport.from_item_and_call(item, call) 

254 

255 

256def pytest_make_collect_report(collector: Collector) -> CollectReport: 

257 call = CallInfo.from_call(lambda: list(collector.collect()), "collect") 

258 longrepr = None 

259 if not call.excinfo: 

260 outcome = "passed" 

261 else: 

262 skip_exceptions = [Skipped] 

263 unittest = sys.modules.get("unittest") 

264 if unittest is not None: 

265 # Type ignored because unittest is loaded dynamically. 

266 skip_exceptions.append(unittest.SkipTest) # type: ignore 

267 if call.excinfo.errisinstance(tuple(skip_exceptions)): 

268 outcome = "skipped" 

269 r_ = collector._repr_failure_py(call.excinfo, "line") 

270 assert isinstance(r_, ExceptionRepr), r_ 

271 r = r_.reprcrash 

272 assert r 

273 longrepr = (str(r.path), r.lineno, r.message) 

274 else: 

275 outcome = "failed" 

276 errorinfo = collector.repr_failure(call.excinfo) 

277 if not hasattr(errorinfo, "toterminal"): 

278 errorinfo = CollectErrorRepr(errorinfo) 

279 longrepr = errorinfo 

280 rep = CollectReport( 

281 collector.nodeid, outcome, longrepr, getattr(call, "result", None) 

282 ) 

283 rep.call = call # type: ignore # see collect_one_node 

284 return rep 

285 

286 

287class SetupState: 

288 """ shared state for setting up/tearing down test items or collectors. """ 

289 

290 def __init__(self): 

291 self.stack = [] # type: List[Node] 

292 self._finalizers = {} # type: Dict[Node, List[Callable[[], None]]] 

293 

294 def addfinalizer(self, finalizer, colitem): 

295 """ attach a finalizer to the given colitem. """ 

296 assert colitem and not isinstance(colitem, tuple) 

297 assert callable(finalizer) 

298 # assert colitem in self.stack # some unit tests don't setup stack :/ 

299 self._finalizers.setdefault(colitem, []).append(finalizer) 

300 

301 def _pop_and_teardown(self): 

302 colitem = self.stack.pop() 

303 self._teardown_with_finalization(colitem) 

304 

305 def _callfinalizers(self, colitem): 

306 finalizers = self._finalizers.pop(colitem, None) 

307 exc = None 

308 while finalizers: 

309 fin = finalizers.pop() 

310 try: 

311 fin() 

312 except TEST_OUTCOME: 

313 # XXX Only first exception will be seen by user, 

314 # ideally all should be reported. 

315 if exc is None: 

316 exc = sys.exc_info() 

317 if exc: 

318 _, val, tb = exc 

319 assert val is not None 

320 raise val.with_traceback(tb) 

321 

322 def _teardown_with_finalization(self, colitem): 

323 self._callfinalizers(colitem) 

324 colitem.teardown() 

325 for colitem in self._finalizers: 

326 assert colitem in self.stack 

327 

328 def teardown_all(self): 

329 while self.stack: 

330 self._pop_and_teardown() 

331 for key in list(self._finalizers): 

332 self._teardown_with_finalization(key) 

333 assert not self._finalizers 

334 

335 def teardown_exact(self, item, nextitem): 

336 needed_collectors = nextitem and nextitem.listchain() or [] 

337 self._teardown_towards(needed_collectors) 

338 

339 def _teardown_towards(self, needed_collectors): 

340 exc = None 

341 while self.stack: 

342 if self.stack == needed_collectors[: len(self.stack)]: 

343 break 

344 try: 

345 self._pop_and_teardown() 

346 except TEST_OUTCOME: 

347 # XXX Only first exception will be seen by user, 

348 # ideally all should be reported. 

349 if exc is None: 

350 exc = sys.exc_info() 

351 if exc: 

352 _, val, tb = exc 

353 assert val is not None 

354 raise val.with_traceback(tb) 

355 

356 def prepare(self, colitem): 

357 """ setup objects along the collector chain to the test-method 

358 and teardown previously setup objects.""" 

359 needed_collectors = colitem.listchain() 

360 self._teardown_towards(needed_collectors) 

361 

362 # check if the last collection node has raised an error 

363 for col in self.stack: 

364 if hasattr(col, "_prepare_exc"): 

365 _, val, tb = col._prepare_exc 

366 raise val.with_traceback(tb) 

367 for col in needed_collectors[len(self.stack) :]: 

368 self.stack.append(col) 

369 try: 

370 col.setup() 

371 except TEST_OUTCOME: 

372 col._prepare_exc = sys.exc_info() 

373 raise 

374 

375 

376def collect_one_node(collector): 

377 ihook = collector.ihook 

378 ihook.pytest_collectstart(collector=collector) 

379 rep = ihook.pytest_make_collect_report(collector=collector) 

380 call = rep.__dict__.pop("call", None) 

381 if call and check_interactive_exception(call, rep): 

382 ihook.pytest_exception_interact(node=collector, call=call, report=rep) 

383 return rep