Coverage for /usr/local/lib/python3.7/site-packages/_pytest/runner.py : 47%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1""" basic collect and runtest protocol implementations """
2import bdb
3import os
4import sys
5from time import time
6from typing import Callable
7from typing import Dict
8from typing import List
9from typing import Optional
10from typing import Tuple
12import attr
14from .reports import CollectErrorRepr
15from .reports import CollectReport
16from .reports import TestReport
17from _pytest._code.code import ExceptionInfo
18from _pytest._code.code import ExceptionRepr
19from _pytest.compat import TYPE_CHECKING
20from _pytest.nodes import Collector
21from _pytest.nodes import Node
22from _pytest.outcomes import Exit
23from _pytest.outcomes import Skipped
24from _pytest.outcomes import TEST_OUTCOME
26if TYPE_CHECKING:
27 from typing import Type
29#
30# pytest plugin hooks
33def pytest_addoption(parser):
34 group = parser.getgroup("terminal reporting", "reporting", after="general")
35 group.addoption(
36 "--durations",
37 action="store",
38 type=int,
39 default=None,
40 metavar="N",
41 help="show N slowest setup/test durations (N=0 for all).",
42 ),
45def pytest_terminal_summary(terminalreporter):
46 durations = terminalreporter.config.option.durations
47 verbose = terminalreporter.config.getvalue("verbose")
48 if durations is None:
49 return
50 tr = terminalreporter
51 dlist = []
52 for replist in tr.stats.values():
53 for rep in replist:
54 if hasattr(rep, "duration"):
55 dlist.append(rep)
56 if not dlist:
57 return
58 dlist.sort(key=lambda x: x.duration)
59 dlist.reverse()
60 if not durations:
61 tr.write_sep("=", "slowest test durations")
62 else:
63 tr.write_sep("=", "slowest %s test durations" % durations)
64 dlist = dlist[:durations]
66 for rep in dlist:
67 if verbose < 2 and rep.duration < 0.005:
68 tr.write_line("")
69 tr.write_line("(0.00 durations hidden. Use -vv to show these durations.)")
70 break
71 tr.write_line("{:02.2f}s {:<8} {}".format(rep.duration, rep.when, rep.nodeid))
74def pytest_sessionstart(session):
75 session._setupstate = SetupState()
78def pytest_sessionfinish(session):
79 session._setupstate.teardown_all()
82def pytest_runtest_protocol(item, nextitem):
83 item.ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
84 runtestprotocol(item, nextitem=nextitem)
85 item.ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
86 return True
89def runtestprotocol(item, log=True, nextitem=None):
90 hasrequest = hasattr(item, "_request")
91 if hasrequest and not item._request:
92 item._initrequest()
93 rep = call_and_report(item, "setup", log)
94 reports = [rep]
95 if rep.passed:
96 if item.config.getoption("setupshow", False):
97 show_test_item(item)
98 if not item.config.getoption("setuponly", False):
99 reports.append(call_and_report(item, "call", log))
100 reports.append(call_and_report(item, "teardown", log, nextitem=nextitem))
101 # after all teardown hooks have been called
102 # want funcargs and request info to go away
103 if hasrequest:
104 item._request = False
105 item.funcargs = None
106 return reports
109def show_test_item(item):
110 """Show test function, parameters and the fixtures of the test item."""
111 tw = item.config.get_terminal_writer()
112 tw.line()
113 tw.write(" " * 8)
114 tw.write(item.nodeid)
115 used_fixtures = sorted(getattr(item, "fixturenames", []))
116 if used_fixtures:
117 tw.write(" (fixtures used: {})".format(", ".join(used_fixtures)))
120def pytest_runtest_setup(item):
121 _update_current_test_var(item, "setup")
122 item.session._setupstate.prepare(item)
125def pytest_runtest_call(item):
126 _update_current_test_var(item, "call")
127 try:
128 del sys.last_type
129 del sys.last_value
130 del sys.last_traceback
131 except AttributeError:
132 pass
133 try:
134 item.runtest()
135 except Exception:
136 # Store trace info to allow postmortem debugging
137 type, value, tb = sys.exc_info()
138 assert tb is not None
139 tb = tb.tb_next # Skip *this* frame
140 sys.last_type = type
141 sys.last_value = value
142 sys.last_traceback = tb
143 del type, value, tb # Get rid of these in this frame
144 raise
147def pytest_runtest_teardown(item, nextitem):
148 _update_current_test_var(item, "teardown")
149 item.session._setupstate.teardown_exact(item, nextitem)
150 _update_current_test_var(item, None)
153def _update_current_test_var(item, when):
154 """
155 Update PYTEST_CURRENT_TEST to reflect the current item and stage.
157 If ``when`` is None, delete PYTEST_CURRENT_TEST from the environment.
158 """
159 var_name = "PYTEST_CURRENT_TEST"
160 if when:
161 value = "{} ({})".format(item.nodeid, when)
162 # don't allow null bytes on environment variables (see #2644, #2957)
163 value = value.replace("\x00", "(null)")
164 os.environ[var_name] = value
165 else:
166 os.environ.pop(var_name)
169def pytest_report_teststatus(report):
170 if report.when in ("setup", "teardown"):
171 if report.failed:
172 # category, shortletter, verbose-word
173 return "error", "E", "ERROR"
174 elif report.skipped:
175 return "skipped", "s", "SKIPPED"
176 else:
177 return "", "", ""
180#
181# Implementation
184def call_and_report(item, when, log=True, **kwds):
185 call = call_runtest_hook(item, when, **kwds)
186 hook = item.ihook
187 report = hook.pytest_runtest_makereport(item=item, call=call)
188 if log:
189 hook.pytest_runtest_logreport(report=report)
190 if check_interactive_exception(call, report):
191 hook.pytest_exception_interact(node=item, call=call, report=report)
192 return report
195def check_interactive_exception(call, report):
196 return call.excinfo and not (
197 hasattr(report, "wasxfail")
198 or call.excinfo.errisinstance(Skipped)
199 or call.excinfo.errisinstance(bdb.BdbQuit)
200 )
203def call_runtest_hook(item, when, **kwds):
204 hookname = "pytest_runtest_" + when
205 ihook = getattr(item.ihook, hookname)
206 reraise = (Exit,) # type: Tuple[Type[BaseException], ...]
207 if not item.config.getoption("usepdb", False):
208 reraise += (KeyboardInterrupt,)
209 return CallInfo.from_call(
210 lambda: ihook(item=item, **kwds), when=when, reraise=reraise
211 )
214@attr.s(repr=False)
215class CallInfo:
216 """ Result/Exception info a function invocation. """
218 _result = attr.ib()
219 excinfo = attr.ib(type=Optional[ExceptionInfo])
220 start = attr.ib()
221 stop = attr.ib()
222 when = attr.ib()
224 @property
225 def result(self):
226 if self.excinfo is not None:
227 raise AttributeError("{!r} has no valid result".format(self))
228 return self._result
230 @classmethod
231 def from_call(cls, func, when, reraise=None) -> "CallInfo":
232 #: context of invocation: one of "setup", "call",
233 #: "teardown", "memocollect"
234 start = time()
235 excinfo = None
236 try:
237 result = func()
238 except: # noqa
239 excinfo = ExceptionInfo.from_current()
240 if reraise is not None and excinfo.errisinstance(reraise):
241 raise
242 result = None
243 stop = time()
244 return cls(start=start, stop=stop, when=when, result=result, excinfo=excinfo)
246 def __repr__(self):
247 if self.excinfo is None:
248 return "<CallInfo when={!r} result: {!r}>".format(self.when, self._result)
249 return "<CallInfo when={!r} excinfo={!r}>".format(self.when, self.excinfo)
252def pytest_runtest_makereport(item, call):
253 return TestReport.from_item_and_call(item, call)
256def pytest_make_collect_report(collector: Collector) -> CollectReport:
257 call = CallInfo.from_call(lambda: list(collector.collect()), "collect")
258 longrepr = None
259 if not call.excinfo:
260 outcome = "passed"
261 else:
262 skip_exceptions = [Skipped]
263 unittest = sys.modules.get("unittest")
264 if unittest is not None:
265 # Type ignored because unittest is loaded dynamically.
266 skip_exceptions.append(unittest.SkipTest) # type: ignore
267 if call.excinfo.errisinstance(tuple(skip_exceptions)):
268 outcome = "skipped"
269 r_ = collector._repr_failure_py(call.excinfo, "line")
270 assert isinstance(r_, ExceptionRepr), r_
271 r = r_.reprcrash
272 assert r
273 longrepr = (str(r.path), r.lineno, r.message)
274 else:
275 outcome = "failed"
276 errorinfo = collector.repr_failure(call.excinfo)
277 if not hasattr(errorinfo, "toterminal"):
278 errorinfo = CollectErrorRepr(errorinfo)
279 longrepr = errorinfo
280 rep = CollectReport(
281 collector.nodeid, outcome, longrepr, getattr(call, "result", None)
282 )
283 rep.call = call # type: ignore # see collect_one_node
284 return rep
287class SetupState:
288 """ shared state for setting up/tearing down test items or collectors. """
290 def __init__(self):
291 self.stack = [] # type: List[Node]
292 self._finalizers = {} # type: Dict[Node, List[Callable[[], None]]]
294 def addfinalizer(self, finalizer, colitem):
295 """ attach a finalizer to the given colitem. """
296 assert colitem and not isinstance(colitem, tuple)
297 assert callable(finalizer)
298 # assert colitem in self.stack # some unit tests don't setup stack :/
299 self._finalizers.setdefault(colitem, []).append(finalizer)
301 def _pop_and_teardown(self):
302 colitem = self.stack.pop()
303 self._teardown_with_finalization(colitem)
305 def _callfinalizers(self, colitem):
306 finalizers = self._finalizers.pop(colitem, None)
307 exc = None
308 while finalizers:
309 fin = finalizers.pop()
310 try:
311 fin()
312 except TEST_OUTCOME:
313 # XXX Only first exception will be seen by user,
314 # ideally all should be reported.
315 if exc is None:
316 exc = sys.exc_info()
317 if exc:
318 _, val, tb = exc
319 assert val is not None
320 raise val.with_traceback(tb)
322 def _teardown_with_finalization(self, colitem):
323 self._callfinalizers(colitem)
324 colitem.teardown()
325 for colitem in self._finalizers:
326 assert colitem in self.stack
328 def teardown_all(self):
329 while self.stack:
330 self._pop_and_teardown()
331 for key in list(self._finalizers):
332 self._teardown_with_finalization(key)
333 assert not self._finalizers
335 def teardown_exact(self, item, nextitem):
336 needed_collectors = nextitem and nextitem.listchain() or []
337 self._teardown_towards(needed_collectors)
339 def _teardown_towards(self, needed_collectors):
340 exc = None
341 while self.stack:
342 if self.stack == needed_collectors[: len(self.stack)]:
343 break
344 try:
345 self._pop_and_teardown()
346 except TEST_OUTCOME:
347 # XXX Only first exception will be seen by user,
348 # ideally all should be reported.
349 if exc is None:
350 exc = sys.exc_info()
351 if exc:
352 _, val, tb = exc
353 assert val is not None
354 raise val.with_traceback(tb)
356 def prepare(self, colitem):
357 """ setup objects along the collector chain to the test-method
358 and teardown previously setup objects."""
359 needed_collectors = colitem.listchain()
360 self._teardown_towards(needed_collectors)
362 # check if the last collection node has raised an error
363 for col in self.stack:
364 if hasattr(col, "_prepare_exc"):
365 _, val, tb = col._prepare_exc
366 raise val.with_traceback(tb)
367 for col in needed_collectors[len(self.stack) :]:
368 self.stack.append(col)
369 try:
370 col.setup()
371 except TEST_OUTCOME:
372 col._prepare_exc = sys.exc_info()
373 raise
376def collect_one_node(collector):
377 ihook = collector.ihook
378 ihook.pytest_collectstart(collector=collector)
379 rep = ihook.pytest_make_collect_report(collector=collector)
380 call = rep.__dict__.pop("call", None)
381 if call and check_interactive_exception(call, rep):
382 ihook.pytest_exception_interact(node=collector, call=call, report=rep)
383 return rep