Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2per-test stdout/stderr capturing mechanism. 

3 

4""" 

5import collections 

6import contextlib 

7import io 

8import os 

9import sys 

10from io import UnsupportedOperation 

11from tempfile import TemporaryFile 

12 

13import pytest 

14from _pytest.compat import CaptureIO 

15from _pytest.fixtures import FixtureRequest 

16 

17patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} 

18 

19 

20def pytest_addoption(parser): 

21 group = parser.getgroup("general") 

22 group._addoption( 

23 "--capture", 

24 action="store", 

25 default="fd" if hasattr(os, "dup") else "sys", 

26 metavar="method", 

27 choices=["fd", "sys", "no"], 

28 help="per-test capturing method: one of fd|sys|no.", 

29 ) 

30 group._addoption( 

31 "-s", 

32 action="store_const", 

33 const="no", 

34 dest="capture", 

35 help="shortcut for --capture=no.", 

36 ) 

37 

38 

39@pytest.hookimpl(hookwrapper=True) 

40def pytest_load_initial_conftests(early_config, parser, args): 

41 ns = early_config.known_args_namespace 

42 if ns.capture == "fd": 

43 _py36_windowsconsoleio_workaround(sys.stdout) 

44 _colorama_workaround() 

45 _readline_workaround() 

46 pluginmanager = early_config.pluginmanager 

47 capman = CaptureManager(ns.capture) 

48 pluginmanager.register(capman, "capturemanager") 

49 

50 # make sure that capturemanager is properly reset at final shutdown 

51 early_config.add_cleanup(capman.stop_global_capturing) 

52 

53 # finally trigger conftest loading but while capturing (issue93) 

54 capman.start_global_capturing() 

55 outcome = yield 

56 capman.suspend_global_capture() 

57 if outcome.excinfo is not None: 

58 out, err = capman.read_global_capture() 

59 sys.stdout.write(out) 

60 sys.stderr.write(err) 

61 

62 

63class CaptureManager: 

64 """ 

65 Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each 

66 test phase (setup, call, teardown). After each of those points, the captured output is obtained and 

67 attached to the collection/runtest report. 

68 

69 There are two levels of capture: 

70 * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled 

71 during collection and each test phase. 

72 * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this 

73 case special handling is needed to ensure the fixtures take precedence over the global capture. 

74 """ 

75 

76 def __init__(self, method): 

77 self._method = method 

78 self._global_capturing = None 

79 self._current_item = None 

80 

81 def __repr__(self): 

82 return "<CaptureManager _method={!r} _global_capturing={!r} _current_item={!r}>".format( 

83 self._method, self._global_capturing, self._current_item 

84 ) 

85 

86 def _getcapture(self, method): 

87 if method == "fd": 

88 return MultiCapture(out=True, err=True, Capture=FDCapture) 

89 elif method == "sys": 

90 return MultiCapture(out=True, err=True, Capture=SysCapture) 

91 elif method == "no": 

92 return MultiCapture(out=False, err=False, in_=False) 

93 raise ValueError("unknown capturing method: %r" % method) # pragma: no cover 

94 

95 def is_capturing(self): 

96 if self.is_globally_capturing(): 

97 return "global" 

98 capture_fixture = getattr(self._current_item, "_capture_fixture", None) 

99 if capture_fixture is not None: 

100 return ( 

101 "fixture %s" % self._current_item._capture_fixture.request.fixturename 

102 ) 

103 return False 

104 

105 # Global capturing control 

106 

107 def is_globally_capturing(self): 

108 return self._method != "no" 

109 

110 def start_global_capturing(self): 

111 assert self._global_capturing is None 

112 self._global_capturing = self._getcapture(self._method) 

113 self._global_capturing.start_capturing() 

114 

115 def stop_global_capturing(self): 

116 if self._global_capturing is not None: 

117 self._global_capturing.pop_outerr_to_orig() 

118 self._global_capturing.stop_capturing() 

119 self._global_capturing = None 

120 

121 def resume_global_capture(self): 

122 # During teardown of the python process, and on rare occasions, capture 

123 # attributes can be `None` while trying to resume global capture. 

124 if self._global_capturing is not None: 

125 self._global_capturing.resume_capturing() 

126 

127 def suspend_global_capture(self, in_=False): 

128 cap = getattr(self, "_global_capturing", None) 

129 if cap is not None: 

130 cap.suspend_capturing(in_=in_) 

131 

132 def suspend(self, in_=False): 

133 # Need to undo local capsys-et-al if it exists before disabling global capture. 

134 self.suspend_fixture(self._current_item) 

135 self.suspend_global_capture(in_) 

136 

137 def resume(self): 

138 self.resume_global_capture() 

139 self.resume_fixture(self._current_item) 

140 

141 def read_global_capture(self): 

142 return self._global_capturing.readouterr() 

143 

144 # Fixture Control (it's just forwarding, think about removing this later) 

145 

146 def activate_fixture(self, item): 

147 """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over 

148 the global capture. 

149 """ 

150 fixture = getattr(item, "_capture_fixture", None) 

151 if fixture is not None: 

152 fixture._start() 

153 

154 def deactivate_fixture(self, item): 

155 """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any.""" 

156 fixture = getattr(item, "_capture_fixture", None) 

157 if fixture is not None: 

158 fixture.close() 

159 

160 def suspend_fixture(self, item): 

161 fixture = getattr(item, "_capture_fixture", None) 

162 if fixture is not None: 

163 fixture._suspend() 

164 

165 def resume_fixture(self, item): 

166 fixture = getattr(item, "_capture_fixture", None) 

167 if fixture is not None: 

168 fixture._resume() 

169 

170 # Helper context managers 

171 

172 @contextlib.contextmanager 

173 def global_and_fixture_disabled(self): 

174 """Context manager to temporarily disable global and current fixture capturing.""" 

175 self.suspend() 

176 try: 

177 yield 

178 finally: 

179 self.resume() 

180 

181 @contextlib.contextmanager 

182 def item_capture(self, when, item): 

183 self.resume_global_capture() 

184 self.activate_fixture(item) 

185 try: 

186 yield 

187 finally: 

188 self.deactivate_fixture(item) 

189 self.suspend_global_capture(in_=False) 

190 

191 out, err = self.read_global_capture() 

192 item.add_report_section(when, "stdout", out) 

193 item.add_report_section(when, "stderr", err) 

194 

195 # Hooks 

196 

197 @pytest.hookimpl(hookwrapper=True) 

198 def pytest_make_collect_report(self, collector): 

199 if isinstance(collector, pytest.File): 

200 self.resume_global_capture() 

201 outcome = yield 

202 self.suspend_global_capture() 

203 out, err = self.read_global_capture() 

204 rep = outcome.get_result() 

205 if out: 

206 rep.sections.append(("Captured stdout", out)) 

207 if err: 

208 rep.sections.append(("Captured stderr", err)) 

209 else: 

210 yield 

211 

212 @pytest.hookimpl(hookwrapper=True) 

213 def pytest_runtest_protocol(self, item): 

214 self._current_item = item 

215 yield 

216 self._current_item = None 

217 

218 @pytest.hookimpl(hookwrapper=True) 

219 def pytest_runtest_setup(self, item): 

220 with self.item_capture("setup", item): 

221 yield 

222 

223 @pytest.hookimpl(hookwrapper=True) 

224 def pytest_runtest_call(self, item): 

225 with self.item_capture("call", item): 

226 yield 

227 

228 @pytest.hookimpl(hookwrapper=True) 

229 def pytest_runtest_teardown(self, item): 

230 with self.item_capture("teardown", item): 

231 yield 

232 

233 @pytest.hookimpl(tryfirst=True) 

234 def pytest_keyboard_interrupt(self, excinfo): 

235 self.stop_global_capturing() 

236 

237 @pytest.hookimpl(tryfirst=True) 

238 def pytest_internalerror(self, excinfo): 

239 self.stop_global_capturing() 

240 

241 

242capture_fixtures = {"capfd", "capfdbinary", "capsys", "capsysbinary"} 

243 

244 

245def _ensure_only_one_capture_fixture(request: FixtureRequest, name): 

246 fixtures = sorted(set(request.fixturenames) & capture_fixtures - {name}) 

247 if fixtures: 

248 arg = fixtures[0] if len(fixtures) == 1 else fixtures 

249 raise request.raiseerror( 

250 "cannot use {} and {} at the same time".format(arg, name) 

251 ) 

252 

253 

254@pytest.fixture 

255def capsys(request): 

256 """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. 

257 

258 The captured output is made available via ``capsys.readouterr()`` method 

259 calls, which return a ``(out, err)`` namedtuple. 

260 ``out`` and ``err`` will be ``text`` objects. 

261 """ 

262 _ensure_only_one_capture_fixture(request, "capsys") 

263 with _install_capture_fixture_on_item(request, SysCapture) as fixture: 

264 yield fixture 

265 

266 

267@pytest.fixture 

268def capsysbinary(request): 

269 """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. 

270 

271 The captured output is made available via ``capsysbinary.readouterr()`` 

272 method calls, which return a ``(out, err)`` namedtuple. 

273 ``out`` and ``err`` will be ``bytes`` objects. 

274 """ 

275 _ensure_only_one_capture_fixture(request, "capsysbinary") 

276 with _install_capture_fixture_on_item(request, SysCaptureBinary) as fixture: 

277 yield fixture 

278 

279 

280@pytest.fixture 

281def capfd(request): 

282 """Enable text capturing of writes to file descriptors ``1`` and ``2``. 

283 

284 The captured output is made available via ``capfd.readouterr()`` method 

285 calls, which return a ``(out, err)`` namedtuple. 

286 ``out`` and ``err`` will be ``text`` objects. 

287 """ 

288 _ensure_only_one_capture_fixture(request, "capfd") 

289 if not hasattr(os, "dup"): 

290 pytest.skip( 

291 "capfd fixture needs os.dup function which is not available in this system" 

292 ) 

293 with _install_capture_fixture_on_item(request, FDCapture) as fixture: 

294 yield fixture 

295 

296 

297@pytest.fixture 

298def capfdbinary(request): 

299 """Enable bytes capturing of writes to file descriptors ``1`` and ``2``. 

300 

301 The captured output is made available via ``capfd.readouterr()`` method 

302 calls, which return a ``(out, err)`` namedtuple. 

303 ``out`` and ``err`` will be ``byte`` objects. 

304 """ 

305 _ensure_only_one_capture_fixture(request, "capfdbinary") 

306 if not hasattr(os, "dup"): 

307 pytest.skip( 

308 "capfdbinary fixture needs os.dup function which is not available in this system" 

309 ) 

310 with _install_capture_fixture_on_item(request, FDCaptureBinary) as fixture: 

311 yield fixture 

312 

313 

314@contextlib.contextmanager 

315def _install_capture_fixture_on_item(request, capture_class): 

316 """ 

317 Context manager which creates a ``CaptureFixture`` instance and "installs" it on 

318 the item/node of the given request. Used by ``capsys`` and ``capfd``. 

319 

320 The CaptureFixture is added as attribute of the item because it needs to accessed 

321 by ``CaptureManager`` during its ``pytest_runtest_*`` hooks. 

322 """ 

323 request.node._capture_fixture = fixture = CaptureFixture(capture_class, request) 

324 capmanager = request.config.pluginmanager.getplugin("capturemanager") 

325 # Need to active this fixture right away in case it is being used by another fixture (setup phase). 

326 # If this fixture is being used only by a test function (call phase), then we wouldn't need this 

327 # activation, but it doesn't hurt. 

328 capmanager.activate_fixture(request.node) 

329 yield fixture 

330 fixture.close() 

331 del request.node._capture_fixture 

332 

333 

334class CaptureFixture: 

335 """ 

336 Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` 

337 fixtures. 

338 """ 

339 

340 def __init__(self, captureclass, request): 

341 self.captureclass = captureclass 

342 self.request = request 

343 self._capture = None 

344 self._captured_out = self.captureclass.EMPTY_BUFFER 

345 self._captured_err = self.captureclass.EMPTY_BUFFER 

346 

347 def _start(self): 

348 if self._capture is None: 

349 self._capture = MultiCapture( 

350 out=True, err=True, in_=False, Capture=self.captureclass 

351 ) 

352 self._capture.start_capturing() 

353 

354 def close(self): 

355 if self._capture is not None: 

356 out, err = self._capture.pop_outerr_to_orig() 

357 self._captured_out += out 

358 self._captured_err += err 

359 self._capture.stop_capturing() 

360 self._capture = None 

361 

362 def readouterr(self): 

363 """Read and return the captured output so far, resetting the internal buffer. 

364 

365 :return: captured content as a namedtuple with ``out`` and ``err`` string attributes 

366 """ 

367 captured_out, captured_err = self._captured_out, self._captured_err 

368 if self._capture is not None: 

369 out, err = self._capture.readouterr() 

370 captured_out += out 

371 captured_err += err 

372 self._captured_out = self.captureclass.EMPTY_BUFFER 

373 self._captured_err = self.captureclass.EMPTY_BUFFER 

374 return CaptureResult(captured_out, captured_err) 

375 

376 def _suspend(self): 

377 """Suspends this fixture's own capturing temporarily.""" 

378 if self._capture is not None: 

379 self._capture.suspend_capturing() 

380 

381 def _resume(self): 

382 """Resumes this fixture's own capturing temporarily.""" 

383 if self._capture is not None: 

384 self._capture.resume_capturing() 

385 

386 @contextlib.contextmanager 

387 def disabled(self): 

388 """Temporarily disables capture while inside the 'with' block.""" 

389 capmanager = self.request.config.pluginmanager.getplugin("capturemanager") 

390 with capmanager.global_and_fixture_disabled(): 

391 yield 

392 

393 

394def safe_text_dupfile(f, mode, default_encoding="UTF8"): 

395 """ return an open text file object that's a duplicate of f on the 

396 FD-level if possible. 

397 """ 

398 encoding = getattr(f, "encoding", None) 

399 try: 

400 fd = f.fileno() 

401 except Exception: 

402 if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"): 

403 # we seem to have a text stream, let's just use it 

404 return f 

405 else: 

406 newfd = os.dup(fd) 

407 if "b" not in mode: 

408 mode += "b" 

409 f = os.fdopen(newfd, mode, 0) # no buffering 

410 return EncodedFile(f, encoding or default_encoding) 

411 

412 

413class EncodedFile: 

414 errors = "strict" # possibly needed by py3 code (issue555) 

415 

416 def __init__(self, buffer, encoding): 

417 self.buffer = buffer 

418 self.encoding = encoding 

419 

420 def write(self, obj): 

421 if isinstance(obj, str): 

422 obj = obj.encode(self.encoding, "replace") 

423 else: 

424 raise TypeError( 

425 "write() argument must be str, not {}".format(type(obj).__name__) 

426 ) 

427 self.buffer.write(obj) 

428 

429 def writelines(self, linelist): 

430 data = "".join(linelist) 

431 self.write(data) 

432 

433 @property 

434 def name(self): 

435 """Ensure that file.name is a string.""" 

436 return repr(self.buffer) 

437 

438 @property 

439 def mode(self): 

440 return self.buffer.mode.replace("b", "") 

441 

442 def __getattr__(self, name): 

443 return getattr(object.__getattribute__(self, "buffer"), name) 

444 

445 

446CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) 

447 

448 

449class MultiCapture: 

450 out = err = in_ = None 

451 _state = None 

452 

453 def __init__(self, out=True, err=True, in_=True, Capture=None): 

454 if in_: 

455 self.in_ = Capture(0) 

456 if out: 

457 self.out = Capture(1) 

458 if err: 

459 self.err = Capture(2) 

460 

461 def __repr__(self): 

462 return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format( 

463 self.out, 

464 self.err, 

465 self.in_, 

466 self._state, 

467 getattr(self, "_in_suspended", "<UNSET>"), 

468 ) 

469 

470 def start_capturing(self): 

471 self._state = "started" 

472 if self.in_: 

473 self.in_.start() 

474 if self.out: 

475 self.out.start() 

476 if self.err: 

477 self.err.start() 

478 

479 def pop_outerr_to_orig(self): 

480 """ pop current snapshot out/err capture and flush to orig streams. """ 

481 out, err = self.readouterr() 

482 if out: 

483 self.out.writeorg(out) 

484 if err: 

485 self.err.writeorg(err) 

486 return out, err 

487 

488 def suspend_capturing(self, in_=False): 

489 self._state = "suspended" 

490 if self.out: 

491 self.out.suspend() 

492 if self.err: 

493 self.err.suspend() 

494 if in_ and self.in_: 

495 self.in_.suspend() 

496 self._in_suspended = True 

497 

498 def resume_capturing(self): 

499 self._state = "resumed" 

500 if self.out: 

501 self.out.resume() 

502 if self.err: 

503 self.err.resume() 

504 if hasattr(self, "_in_suspended"): 

505 self.in_.resume() 

506 del self._in_suspended 

507 

508 def stop_capturing(self): 

509 """ stop capturing and reset capturing streams """ 

510 if self._state == "stopped": 

511 raise ValueError("was already stopped") 

512 self._state = "stopped" 

513 if self.out: 

514 self.out.done() 

515 if self.err: 

516 self.err.done() 

517 if self.in_: 

518 self.in_.done() 

519 

520 def readouterr(self): 

521 """ return snapshot unicode value of stdout/stderr capturings. """ 

522 return CaptureResult( 

523 self.out.snap() if self.out is not None else "", 

524 self.err.snap() if self.err is not None else "", 

525 ) 

526 

527 

528class NoCapture: 

529 EMPTY_BUFFER = None 

530 __init__ = start = done = suspend = resume = lambda *args: None 

531 

532 

533class FDCaptureBinary: 

534 """Capture IO to/from a given os-level filedescriptor. 

535 

536 snap() produces `bytes` 

537 """ 

538 

539 EMPTY_BUFFER = b"" 

540 _state = None 

541 

542 def __init__(self, targetfd, tmpfile=None): 

543 self.targetfd = targetfd 

544 try: 

545 self.targetfd_save = os.dup(self.targetfd) 

546 except OSError: 

547 self.start = lambda: None 

548 self.done = lambda: None 

549 else: 

550 self.start = self._start 

551 self.done = self._done 

552 if targetfd == 0: 

553 assert not tmpfile, "cannot set tmpfile with stdin" 

554 tmpfile = open(os.devnull, "r") 

555 self.syscapture = SysCapture(targetfd) 

556 else: 

557 if tmpfile is None: 

558 f = TemporaryFile() 

559 with f: 

560 tmpfile = safe_text_dupfile(f, mode="wb+") 

561 if targetfd in patchsysdict: 

562 self.syscapture = SysCapture(targetfd, tmpfile) 

563 else: 

564 self.syscapture = NoCapture() 

565 self.tmpfile = tmpfile 

566 self.tmpfile_fd = tmpfile.fileno() 

567 

568 def __repr__(self): 

569 return "<FDCapture {} oldfd={} _state={!r}>".format( 

570 self.targetfd, getattr(self, "targetfd_save", None), self._state 

571 ) 

572 

573 def _start(self): 

574 """ Start capturing on targetfd using memorized tmpfile. """ 

575 try: 

576 os.fstat(self.targetfd_save) 

577 except (AttributeError, OSError): 

578 raise ValueError("saved filedescriptor not valid anymore") 

579 os.dup2(self.tmpfile_fd, self.targetfd) 

580 self.syscapture.start() 

581 self._state = "started" 

582 

583 def snap(self): 

584 self.tmpfile.seek(0) 

585 res = self.tmpfile.read() 

586 self.tmpfile.seek(0) 

587 self.tmpfile.truncate() 

588 return res 

589 

590 def _done(self): 

591 """ stop capturing, restore streams, return original capture file, 

592 seeked to position zero. """ 

593 targetfd_save = self.__dict__.pop("targetfd_save") 

594 os.dup2(targetfd_save, self.targetfd) 

595 os.close(targetfd_save) 

596 self.syscapture.done() 

597 self.tmpfile.close() 

598 self._state = "done" 

599 

600 def suspend(self): 

601 self.syscapture.suspend() 

602 os.dup2(self.targetfd_save, self.targetfd) 

603 self._state = "suspended" 

604 

605 def resume(self): 

606 self.syscapture.resume() 

607 os.dup2(self.tmpfile_fd, self.targetfd) 

608 self._state = "resumed" 

609 

610 def writeorg(self, data): 

611 """ write to original file descriptor. """ 

612 if isinstance(data, str): 

613 data = data.encode("utf8") # XXX use encoding of original stream 

614 os.write(self.targetfd_save, data) 

615 

616 

617class FDCapture(FDCaptureBinary): 

618 """Capture IO to/from a given os-level filedescriptor. 

619 

620 snap() produces text 

621 """ 

622 

623 # Ignore type because it doesn't match the type in the superclass (bytes). 

624 EMPTY_BUFFER = str() # type: ignore 

625 

626 def snap(self): 

627 res = super().snap() 

628 enc = getattr(self.tmpfile, "encoding", None) 

629 if enc and isinstance(res, bytes): 

630 res = str(res, enc, "replace") 

631 return res 

632 

633 

634class SysCapture: 

635 

636 EMPTY_BUFFER = str() 

637 _state = None 

638 

639 def __init__(self, fd, tmpfile=None): 

640 name = patchsysdict[fd] 

641 self._old = getattr(sys, name) 

642 self.name = name 

643 if tmpfile is None: 

644 if name == "stdin": 

645 tmpfile = DontReadFromInput() 

646 else: 

647 tmpfile = CaptureIO() 

648 self.tmpfile = tmpfile 

649 

650 def __repr__(self): 

651 return "<SysCapture {} _old={!r}, tmpfile={!r} _state={!r}>".format( 

652 self.name, self._old, self.tmpfile, self._state 

653 ) 

654 

655 def start(self): 

656 setattr(sys, self.name, self.tmpfile) 

657 self._state = "started" 

658 

659 def snap(self): 

660 res = self.tmpfile.getvalue() 

661 self.tmpfile.seek(0) 

662 self.tmpfile.truncate() 

663 return res 

664 

665 def done(self): 

666 setattr(sys, self.name, self._old) 

667 del self._old 

668 self.tmpfile.close() 

669 self._state = "done" 

670 

671 def suspend(self): 

672 setattr(sys, self.name, self._old) 

673 self._state = "suspended" 

674 

675 def resume(self): 

676 setattr(sys, self.name, self.tmpfile) 

677 self._state = "resumed" 

678 

679 def writeorg(self, data): 

680 self._old.write(data) 

681 self._old.flush() 

682 

683 

684class SysCaptureBinary(SysCapture): 

685 # Ignore type because it doesn't match the type in the superclass (str). 

686 EMPTY_BUFFER = b"" # type: ignore 

687 

688 def snap(self): 

689 res = self.tmpfile.buffer.getvalue() 

690 self.tmpfile.seek(0) 

691 self.tmpfile.truncate() 

692 return res 

693 

694 

695class DontReadFromInput: 

696 encoding = None 

697 

698 def read(self, *args): 

699 raise IOError( 

700 "pytest: reading from stdin while output is captured! Consider using `-s`." 

701 ) 

702 

703 readline = read 

704 readlines = read 

705 __next__ = read 

706 

707 def __iter__(self): 

708 return self 

709 

710 def fileno(self): 

711 raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") 

712 

713 def isatty(self): 

714 return False 

715 

716 def close(self): 

717 pass 

718 

719 @property 

720 def buffer(self): 

721 return self 

722 

723 

724def _colorama_workaround(): 

725 """ 

726 Ensure colorama is imported so that it attaches to the correct stdio 

727 handles on Windows. 

728 

729 colorama uses the terminal on import time. So if something does the 

730 first import of colorama while I/O capture is active, colorama will 

731 fail in various ways. 

732 """ 

733 if sys.platform.startswith("win32"): 

734 try: 

735 import colorama # noqa: F401 

736 except ImportError: 

737 pass 

738 

739 

740def _readline_workaround(): 

741 """ 

742 Ensure readline is imported so that it attaches to the correct stdio 

743 handles on Windows. 

744 

745 Pdb uses readline support where available--when not running from the Python 

746 prompt, the readline module is not imported until running the pdb REPL. If 

747 running pytest with the --pdb option this means the readline module is not 

748 imported until after I/O capture has been started. 

749 

750 This is a problem for pyreadline, which is often used to implement readline 

751 support on Windows, as it does not attach to the correct handles for stdout 

752 and/or stdin if they have been redirected by the FDCapture mechanism. This 

753 workaround ensures that readline is imported before I/O capture is setup so 

754 that it can attach to the actual stdin/out for the console. 

755 

756 See https://github.com/pytest-dev/pytest/pull/1281 

757 """ 

758 if sys.platform.startswith("win32"): 

759 try: 

760 import readline # noqa: F401 

761 except ImportError: 

762 pass 

763 

764 

765def _py36_windowsconsoleio_workaround(stream): 

766 """ 

767 Python 3.6 implemented unicode console handling for Windows. This works 

768 by reading/writing to the raw console handle using 

769 ``{Read,Write}ConsoleW``. 

770 

771 The problem is that we are going to ``dup2`` over the stdio file 

772 descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the 

773 handles used by Python to write to the console. Though there is still some 

774 weirdness and the console handle seems to only be closed randomly and not 

775 on the first call to ``CloseHandle``, or maybe it gets reopened with the 

776 same handle value when we suspend capturing. 

777 

778 The workaround in this case will reopen stdio with a different fd which 

779 also means a different handle by replicating the logic in 

780 "Py_lifecycle.c:initstdio/create_stdio". 

781 

782 :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given 

783 here as parameter for unittesting purposes. 

784 

785 See https://github.com/pytest-dev/py/issues/103 

786 """ 

787 if ( 

788 not sys.platform.startswith("win32") 

789 or sys.version_info[:2] < (3, 6) 

790 or hasattr(sys, "pypy_version_info") 

791 ): 

792 return 

793 

794 # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666) 

795 if not hasattr(stream, "buffer"): 

796 return 

797 

798 buffered = hasattr(stream.buffer, "raw") 

799 raw_stdout = stream.buffer.raw if buffered else stream.buffer 

800 

801 if not isinstance(raw_stdout, io._WindowsConsoleIO): 

802 return 

803 

804 def _reopen_stdio(f, mode): 

805 if not buffered and mode[0] == "w": 

806 buffering = 0 

807 else: 

808 buffering = -1 

809 

810 return io.TextIOWrapper( 

811 open(os.dup(f.fileno()), mode, buffering), 

812 f.encoding, 

813 f.errors, 

814 f.newlines, 

815 f.line_buffering, 

816 ) 

817 

818 sys.stdin = _reopen_stdio(sys.stdin, "rb") 

819 sys.stdout = _reopen_stdio(sys.stdout, "wb") 

820 sys.stderr = _reopen_stdio(sys.stderr, "wb")