Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from io import StringIO 

2from pprint import pprint 

3from typing import Any 

4from typing import List 

5from typing import Optional 

6from typing import Tuple 

7from typing import Union 

8 

9import py 

10 

11from _pytest._code.code import ExceptionChainRepr 

12from _pytest._code.code import ExceptionInfo 

13from _pytest._code.code import ReprEntry 

14from _pytest._code.code import ReprEntryNative 

15from _pytest._code.code import ReprExceptionInfo 

16from _pytest._code.code import ReprFileLocation 

17from _pytest._code.code import ReprFuncArgs 

18from _pytest._code.code import ReprLocals 

19from _pytest._code.code import ReprTraceback 

20from _pytest._code.code import TerminalRepr 

21from _pytest.compat import TYPE_CHECKING 

22from _pytest.nodes import Node 

23from _pytest.outcomes import skip 

24from _pytest.pathlib import Path 

25 

26 

27def getslaveinfoline(node): 

28 try: 

29 return node._slaveinfocache 

30 except AttributeError: 

31 d = node.slaveinfo 

32 ver = "%s.%s.%s" % d["version_info"][:3] 

33 node._slaveinfocache = s = "[{}] {} -- Python {} {}".format( 

34 d["id"], d["sysplatform"], ver, d["executable"] 

35 ) 

36 return s 

37 

38 

39class BaseReport: 

40 when = None # type: Optional[str] 

41 location = None # type: Optional[Tuple[str, Optional[int], str]] 

42 longrepr = None 

43 sections = [] # type: List[Tuple[str, str]] 

44 nodeid = None # type: str 

45 

46 def __init__(self, **kw: Any) -> None: 

47 self.__dict__.update(kw) 

48 

49 if TYPE_CHECKING: 

50 # Can have arbitrary fields given to __init__(). 

51 def __getattr__(self, key: str) -> Any: 

52 raise NotImplementedError() 

53 

54 def toterminal(self, out) -> None: 

55 if hasattr(self, "node"): 

56 out.line(getslaveinfoline(self.node)) # type: ignore 

57 

58 longrepr = self.longrepr 

59 if longrepr is None: 

60 return 

61 

62 if hasattr(longrepr, "toterminal"): 

63 longrepr.toterminal(out) 

64 else: 

65 try: 

66 out.line(longrepr) 

67 except UnicodeEncodeError: 

68 out.line("<unprintable longrepr>") 

69 

70 def get_sections(self, prefix): 

71 for name, content in self.sections: 

72 if name.startswith(prefix): 

73 yield prefix, content 

74 

75 @property 

76 def longreprtext(self): 

77 """ 

78 Read-only property that returns the full string representation 

79 of ``longrepr``. 

80 

81 .. versionadded:: 3.0 

82 """ 

83 tw = py.io.TerminalWriter(stringio=True) 

84 tw.hasmarkup = False 

85 self.toterminal(tw) 

86 exc = tw.stringio.getvalue() 

87 return exc.strip() 

88 

89 @property 

90 def caplog(self): 

91 """Return captured log lines, if log capturing is enabled 

92 

93 .. versionadded:: 3.5 

94 """ 

95 return "\n".join( 

96 content for (prefix, content) in self.get_sections("Captured log") 

97 ) 

98 

99 @property 

100 def capstdout(self): 

101 """Return captured text from stdout, if capturing is enabled 

102 

103 .. versionadded:: 3.0 

104 """ 

105 return "".join( 

106 content for (prefix, content) in self.get_sections("Captured stdout") 

107 ) 

108 

109 @property 

110 def capstderr(self): 

111 """Return captured text from stderr, if capturing is enabled 

112 

113 .. versionadded:: 3.0 

114 """ 

115 return "".join( 

116 content for (prefix, content) in self.get_sections("Captured stderr") 

117 ) 

118 

119 passed = property(lambda x: x.outcome == "passed") 

120 failed = property(lambda x: x.outcome == "failed") 

121 skipped = property(lambda x: x.outcome == "skipped") 

122 

123 @property 

124 def fspath(self) -> str: 

125 return self.nodeid.split("::")[0] 

126 

127 @property 

128 def count_towards_summary(self): 

129 """ 

130 **Experimental** 

131 

132 Returns True if this report should be counted towards the totals shown at the end of the 

133 test session: "1 passed, 1 failure, etc". 

134 

135 .. note:: 

136 

137 This function is considered **experimental**, so beware that it is subject to changes 

138 even in patch releases. 

139 """ 

140 return True 

141 

142 @property 

143 def head_line(self): 

144 """ 

145 **Experimental** 

146 

147 Returns the head line shown with longrepr output for this report, more commonly during 

148 traceback representation during failures:: 

149 

150 ________ Test.foo ________ 

151 

152 

153 In the example above, the head_line is "Test.foo". 

154 

155 .. note:: 

156 

157 This function is considered **experimental**, so beware that it is subject to changes 

158 even in patch releases. 

159 """ 

160 if self.location is not None: 

161 fspath, lineno, domain = self.location 

162 return domain 

163 

164 def _get_verbose_word(self, config): 

165 _category, _short, verbose = config.hook.pytest_report_teststatus( 

166 report=self, config=config 

167 ) 

168 return verbose 

169 

170 def _to_json(self): 

171 """ 

172 This was originally the serialize_report() function from xdist (ca03269). 

173 

174 Returns the contents of this report as a dict of builtin entries, suitable for 

175 serialization. 

176 

177 Experimental method. 

178 """ 

179 return _report_to_json(self) 

180 

181 @classmethod 

182 def _from_json(cls, reportdict): 

183 """ 

184 This was originally the serialize_report() function from xdist (ca03269). 

185 

186 Factory method that returns either a TestReport or CollectReport, depending on the calling 

187 class. It's the callers responsibility to know which class to pass here. 

188 

189 Experimental method. 

190 """ 

191 kwargs = _report_kwargs_from_json(reportdict) 

192 return cls(**kwargs) 

193 

194 

195def _report_unserialization_failure(type_name, report_class, reportdict): 

196 url = "https://github.com/pytest-dev/pytest/issues" 

197 stream = StringIO() 

198 pprint("-" * 100, stream=stream) 

199 pprint("INTERNALERROR: Unknown entry type returned: %s" % type_name, stream=stream) 

200 pprint("report_name: %s" % report_class, stream=stream) 

201 pprint(reportdict, stream=stream) 

202 pprint("Please report this bug at %s" % url, stream=stream) 

203 pprint("-" * 100, stream=stream) 

204 raise RuntimeError(stream.getvalue()) 

205 

206 

207class TestReport(BaseReport): 

208 """ Basic test report object (also used for setup and teardown calls if 

209 they fail). 

210 """ 

211 

212 __test__ = False 

213 

214 def __init__( 

215 self, 

216 nodeid, 

217 location: Tuple[str, Optional[int], str], 

218 keywords, 

219 outcome, 

220 longrepr, 

221 when, 

222 sections=(), 

223 duration=0, 

224 user_properties=None, 

225 **extra 

226 ) -> None: 

227 #: normalized collection node id 

228 self.nodeid = nodeid 

229 

230 #: a (filesystempath, lineno, domaininfo) tuple indicating the 

231 #: actual location of a test item - it might be different from the 

232 #: collected one e.g. if a method is inherited from a different module. 

233 self.location = location # type: Tuple[str, Optional[int], str] 

234 

235 #: a name -> value dictionary containing all keywords and 

236 #: markers associated with a test invocation. 

237 self.keywords = keywords 

238 

239 #: test outcome, always one of "passed", "failed", "skipped". 

240 self.outcome = outcome 

241 

242 #: None or a failure representation. 

243 self.longrepr = longrepr 

244 

245 #: one of 'setup', 'call', 'teardown' to indicate runtest phase. 

246 self.when = when 

247 

248 #: user properties is a list of tuples (name, value) that holds user 

249 #: defined properties of the test 

250 self.user_properties = list(user_properties or []) 

251 

252 #: list of pairs ``(str, str)`` of extra information which needs to 

253 #: marshallable. Used by pytest to add captured text 

254 #: from ``stdout`` and ``stderr``, but may be used by other plugins 

255 #: to add arbitrary information to reports. 

256 self.sections = list(sections) 

257 

258 #: time it took to run just the test 

259 self.duration = duration 

260 

261 self.__dict__.update(extra) 

262 

263 def __repr__(self): 

264 return "<{} {!r} when={!r} outcome={!r}>".format( 

265 self.__class__.__name__, self.nodeid, self.when, self.outcome 

266 ) 

267 

268 @classmethod 

269 def from_item_and_call(cls, item, call) -> "TestReport": 

270 """ 

271 Factory method to create and fill a TestReport with standard item and call info. 

272 """ 

273 when = call.when 

274 duration = call.stop - call.start 

275 keywords = {x: 1 for x in item.keywords} 

276 excinfo = call.excinfo 

277 sections = [] 

278 if not call.excinfo: 

279 outcome = "passed" 

280 longrepr = None 

281 else: 

282 if not isinstance(excinfo, ExceptionInfo): 

283 outcome = "failed" 

284 longrepr = excinfo 

285 # Type ignored -- see comment where skip.Exception is defined. 

286 elif excinfo.errisinstance(skip.Exception): # type: ignore 

287 outcome = "skipped" 

288 r = excinfo._getreprcrash() 

289 longrepr = (str(r.path), r.lineno, r.message) 

290 else: 

291 outcome = "failed" 

292 if call.when == "call": 

293 longrepr = item.repr_failure(excinfo) 

294 else: # exception in setup or teardown 

295 longrepr = item._repr_failure_py( 

296 excinfo, style=item.config.getoption("tbstyle", "auto") 

297 ) 

298 for rwhen, key, content in item._report_sections: 

299 sections.append(("Captured {} {}".format(key, rwhen), content)) 

300 return cls( 

301 item.nodeid, 

302 item.location, 

303 keywords, 

304 outcome, 

305 longrepr, 

306 when, 

307 sections, 

308 duration, 

309 user_properties=item.user_properties, 

310 ) 

311 

312 

313class CollectReport(BaseReport): 

314 when = "collect" 

315 

316 def __init__( 

317 self, nodeid: str, outcome, longrepr, result: List[Node], sections=(), **extra 

318 ) -> None: 

319 self.nodeid = nodeid 

320 self.outcome = outcome 

321 self.longrepr = longrepr 

322 self.result = result or [] 

323 self.sections = list(sections) 

324 self.__dict__.update(extra) 

325 

326 @property 

327 def location(self): 

328 return (self.fspath, None, self.fspath) 

329 

330 def __repr__(self): 

331 return "<CollectReport {!r} lenresult={} outcome={!r}>".format( 

332 self.nodeid, len(self.result), self.outcome 

333 ) 

334 

335 

336class CollectErrorRepr(TerminalRepr): 

337 def __init__(self, msg): 

338 self.longrepr = msg 

339 

340 def toterminal(self, out) -> None: 

341 out.line(self.longrepr, red=True) 

342 

343 

344def pytest_report_to_serializable(report): 

345 if isinstance(report, (TestReport, CollectReport)): 

346 data = report._to_json() 

347 data["$report_type"] = report.__class__.__name__ 

348 return data 

349 

350 

351def pytest_report_from_serializable(data): 

352 if "$report_type" in data: 

353 if data["$report_type"] == "TestReport": 

354 return TestReport._from_json(data) 

355 elif data["$report_type"] == "CollectReport": 

356 return CollectReport._from_json(data) 

357 assert False, "Unknown report_type unserialize data: {}".format( 

358 data["$report_type"] 

359 ) 

360 

361 

362def _report_to_json(report): 

363 """ 

364 This was originally the serialize_report() function from xdist (ca03269). 

365 

366 Returns the contents of this report as a dict of builtin entries, suitable for 

367 serialization. 

368 """ 

369 

370 def serialize_repr_entry(entry): 

371 entry_data = {"type": type(entry).__name__, "data": entry.__dict__.copy()} 

372 for key, value in entry_data["data"].items(): 

373 if hasattr(value, "__dict__"): 

374 entry_data["data"][key] = value.__dict__.copy() 

375 return entry_data 

376 

377 def serialize_repr_traceback(reprtraceback): 

378 result = reprtraceback.__dict__.copy() 

379 result["reprentries"] = [ 

380 serialize_repr_entry(x) for x in reprtraceback.reprentries 

381 ] 

382 return result 

383 

384 def serialize_repr_crash(reprcrash: Optional[ReprFileLocation]): 

385 if reprcrash is not None: 

386 return reprcrash.__dict__.copy() 

387 else: 

388 return None 

389 

390 def serialize_longrepr(rep): 

391 result = { 

392 "reprcrash": serialize_repr_crash(rep.longrepr.reprcrash), 

393 "reprtraceback": serialize_repr_traceback(rep.longrepr.reprtraceback), 

394 "sections": rep.longrepr.sections, 

395 } 

396 if isinstance(rep.longrepr, ExceptionChainRepr): 

397 result["chain"] = [] 

398 for repr_traceback, repr_crash, description in rep.longrepr.chain: 

399 result["chain"].append( 

400 ( 

401 serialize_repr_traceback(repr_traceback), 

402 serialize_repr_crash(repr_crash), 

403 description, 

404 ) 

405 ) 

406 else: 

407 result["chain"] = None 

408 return result 

409 

410 d = report.__dict__.copy() 

411 if hasattr(report.longrepr, "toterminal"): 

412 if hasattr(report.longrepr, "reprtraceback") and hasattr( 

413 report.longrepr, "reprcrash" 

414 ): 

415 d["longrepr"] = serialize_longrepr(report) 

416 else: 

417 d["longrepr"] = str(report.longrepr) 

418 else: 

419 d["longrepr"] = report.longrepr 

420 for name in d: 

421 if isinstance(d[name], (py.path.local, Path)): 

422 d[name] = str(d[name]) 

423 elif name == "result": 

424 d[name] = None # for now 

425 return d 

426 

427 

428def _report_kwargs_from_json(reportdict): 

429 """ 

430 This was originally the serialize_report() function from xdist (ca03269). 

431 

432 Returns **kwargs that can be used to construct a TestReport or CollectReport instance. 

433 """ 

434 

435 def deserialize_repr_entry(entry_data): 

436 data = entry_data["data"] 

437 entry_type = entry_data["type"] 

438 if entry_type == "ReprEntry": 

439 reprfuncargs = None 

440 reprfileloc = None 

441 reprlocals = None 

442 if data["reprfuncargs"]: 

443 reprfuncargs = ReprFuncArgs(**data["reprfuncargs"]) 

444 if data["reprfileloc"]: 

445 reprfileloc = ReprFileLocation(**data["reprfileloc"]) 

446 if data["reprlocals"]: 

447 reprlocals = ReprLocals(data["reprlocals"]["lines"]) 

448 

449 reprentry = ReprEntry( 

450 lines=data["lines"], 

451 reprfuncargs=reprfuncargs, 

452 reprlocals=reprlocals, 

453 filelocrepr=reprfileloc, 

454 style=data["style"], 

455 ) # type: Union[ReprEntry, ReprEntryNative] 

456 elif entry_type == "ReprEntryNative": 

457 reprentry = ReprEntryNative(data["lines"]) 

458 else: 

459 _report_unserialization_failure(entry_type, TestReport, reportdict) 

460 return reprentry 

461 

462 def deserialize_repr_traceback(repr_traceback_dict): 

463 repr_traceback_dict["reprentries"] = [ 

464 deserialize_repr_entry(x) for x in repr_traceback_dict["reprentries"] 

465 ] 

466 return ReprTraceback(**repr_traceback_dict) 

467 

468 def deserialize_repr_crash(repr_crash_dict: Optional[dict]): 

469 if repr_crash_dict is not None: 

470 return ReprFileLocation(**repr_crash_dict) 

471 else: 

472 return None 

473 

474 if ( 

475 reportdict["longrepr"] 

476 and "reprcrash" in reportdict["longrepr"] 

477 and "reprtraceback" in reportdict["longrepr"] 

478 ): 

479 

480 reprtraceback = deserialize_repr_traceback( 

481 reportdict["longrepr"]["reprtraceback"] 

482 ) 

483 reprcrash = deserialize_repr_crash(reportdict["longrepr"]["reprcrash"]) 

484 if reportdict["longrepr"]["chain"]: 

485 chain = [] 

486 for repr_traceback_data, repr_crash_data, description in reportdict[ 

487 "longrepr" 

488 ]["chain"]: 

489 chain.append( 

490 ( 

491 deserialize_repr_traceback(repr_traceback_data), 

492 deserialize_repr_crash(repr_crash_data), 

493 description, 

494 ) 

495 ) 

496 exception_info = ExceptionChainRepr( 

497 chain 

498 ) # type: Union[ExceptionChainRepr,ReprExceptionInfo] 

499 else: 

500 exception_info = ReprExceptionInfo(reprtraceback, reprcrash) 

501 

502 for section in reportdict["longrepr"]["sections"]: 

503 exception_info.addsection(*section) 

504 reportdict["longrepr"] = exception_info 

505 

506 return reportdict