Coverage for /usr/local/lib/python3.7/site-packages/_pytest/capture.py : 16%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2per-test stdout/stderr capturing mechanism.
4"""
5import collections
6import contextlib
7import io
8import os
9import sys
10from io import UnsupportedOperation
11from tempfile import TemporaryFile
13import pytest
14from _pytest.compat import CaptureIO
15from _pytest.fixtures import FixtureRequest
17patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
20def pytest_addoption(parser):
21 group = parser.getgroup("general")
22 group._addoption(
23 "--capture",
24 action="store",
25 default="fd" if hasattr(os, "dup") else "sys",
26 metavar="method",
27 choices=["fd", "sys", "no"],
28 help="per-test capturing method: one of fd|sys|no.",
29 )
30 group._addoption(
31 "-s",
32 action="store_const",
33 const="no",
34 dest="capture",
35 help="shortcut for --capture=no.",
36 )
39@pytest.hookimpl(hookwrapper=True)
40def pytest_load_initial_conftests(early_config, parser, args):
41 ns = early_config.known_args_namespace
42 if ns.capture == "fd":
43 _py36_windowsconsoleio_workaround(sys.stdout)
44 _colorama_workaround()
45 _readline_workaround()
46 pluginmanager = early_config.pluginmanager
47 capman = CaptureManager(ns.capture)
48 pluginmanager.register(capman, "capturemanager")
50 # make sure that capturemanager is properly reset at final shutdown
51 early_config.add_cleanup(capman.stop_global_capturing)
53 # finally trigger conftest loading but while capturing (issue93)
54 capman.start_global_capturing()
55 outcome = yield
56 capman.suspend_global_capture()
57 if outcome.excinfo is not None:
58 out, err = capman.read_global_capture()
59 sys.stdout.write(out)
60 sys.stderr.write(err)
63class CaptureManager:
64 """
65 Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each
66 test phase (setup, call, teardown). After each of those points, the captured output is obtained and
67 attached to the collection/runtest report.
69 There are two levels of capture:
70 * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled
71 during collection and each test phase.
72 * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this
73 case special handling is needed to ensure the fixtures take precedence over the global capture.
74 """
76 def __init__(self, method):
77 self._method = method
78 self._global_capturing = None
79 self._current_item = None
81 def __repr__(self):
82 return "<CaptureManager _method={!r} _global_capturing={!r} _current_item={!r}>".format(
83 self._method, self._global_capturing, self._current_item
84 )
86 def _getcapture(self, method):
87 if method == "fd":
88 return MultiCapture(out=True, err=True, Capture=FDCapture)
89 elif method == "sys":
90 return MultiCapture(out=True, err=True, Capture=SysCapture)
91 elif method == "no":
92 return MultiCapture(out=False, err=False, in_=False)
93 raise ValueError("unknown capturing method: %r" % method) # pragma: no cover
95 def is_capturing(self):
96 if self.is_globally_capturing():
97 return "global"
98 capture_fixture = getattr(self._current_item, "_capture_fixture", None)
99 if capture_fixture is not None:
100 return (
101 "fixture %s" % self._current_item._capture_fixture.request.fixturename
102 )
103 return False
105 # Global capturing control
107 def is_globally_capturing(self):
108 return self._method != "no"
110 def start_global_capturing(self):
111 assert self._global_capturing is None
112 self._global_capturing = self._getcapture(self._method)
113 self._global_capturing.start_capturing()
115 def stop_global_capturing(self):
116 if self._global_capturing is not None:
117 self._global_capturing.pop_outerr_to_orig()
118 self._global_capturing.stop_capturing()
119 self._global_capturing = None
121 def resume_global_capture(self):
122 # During teardown of the python process, and on rare occasions, capture
123 # attributes can be `None` while trying to resume global capture.
124 if self._global_capturing is not None:
125 self._global_capturing.resume_capturing()
127 def suspend_global_capture(self, in_=False):
128 cap = getattr(self, "_global_capturing", None)
129 if cap is not None:
130 cap.suspend_capturing(in_=in_)
132 def suspend(self, in_=False):
133 # Need to undo local capsys-et-al if it exists before disabling global capture.
134 self.suspend_fixture(self._current_item)
135 self.suspend_global_capture(in_)
137 def resume(self):
138 self.resume_global_capture()
139 self.resume_fixture(self._current_item)
141 def read_global_capture(self):
142 return self._global_capturing.readouterr()
144 # Fixture Control (it's just forwarding, think about removing this later)
146 def activate_fixture(self, item):
147 """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
148 the global capture.
149 """
150 fixture = getattr(item, "_capture_fixture", None)
151 if fixture is not None:
152 fixture._start()
154 def deactivate_fixture(self, item):
155 """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any."""
156 fixture = getattr(item, "_capture_fixture", None)
157 if fixture is not None:
158 fixture.close()
160 def suspend_fixture(self, item):
161 fixture = getattr(item, "_capture_fixture", None)
162 if fixture is not None:
163 fixture._suspend()
165 def resume_fixture(self, item):
166 fixture = getattr(item, "_capture_fixture", None)
167 if fixture is not None:
168 fixture._resume()
170 # Helper context managers
172 @contextlib.contextmanager
173 def global_and_fixture_disabled(self):
174 """Context manager to temporarily disable global and current fixture capturing."""
175 self.suspend()
176 try:
177 yield
178 finally:
179 self.resume()
181 @contextlib.contextmanager
182 def item_capture(self, when, item):
183 self.resume_global_capture()
184 self.activate_fixture(item)
185 try:
186 yield
187 finally:
188 self.deactivate_fixture(item)
189 self.suspend_global_capture(in_=False)
191 out, err = self.read_global_capture()
192 item.add_report_section(when, "stdout", out)
193 item.add_report_section(when, "stderr", err)
195 # Hooks
197 @pytest.hookimpl(hookwrapper=True)
198 def pytest_make_collect_report(self, collector):
199 if isinstance(collector, pytest.File):
200 self.resume_global_capture()
201 outcome = yield
202 self.suspend_global_capture()
203 out, err = self.read_global_capture()
204 rep = outcome.get_result()
205 if out:
206 rep.sections.append(("Captured stdout", out))
207 if err:
208 rep.sections.append(("Captured stderr", err))
209 else:
210 yield
212 @pytest.hookimpl(hookwrapper=True)
213 def pytest_runtest_protocol(self, item):
214 self._current_item = item
215 yield
216 self._current_item = None
218 @pytest.hookimpl(hookwrapper=True)
219 def pytest_runtest_setup(self, item):
220 with self.item_capture("setup", item):
221 yield
223 @pytest.hookimpl(hookwrapper=True)
224 def pytest_runtest_call(self, item):
225 with self.item_capture("call", item):
226 yield
228 @pytest.hookimpl(hookwrapper=True)
229 def pytest_runtest_teardown(self, item):
230 with self.item_capture("teardown", item):
231 yield
233 @pytest.hookimpl(tryfirst=True)
234 def pytest_keyboard_interrupt(self, excinfo):
235 self.stop_global_capturing()
237 @pytest.hookimpl(tryfirst=True)
238 def pytest_internalerror(self, excinfo):
239 self.stop_global_capturing()
242capture_fixtures = {"capfd", "capfdbinary", "capsys", "capsysbinary"}
245def _ensure_only_one_capture_fixture(request: FixtureRequest, name):
246 fixtures = sorted(set(request.fixturenames) & capture_fixtures - {name})
247 if fixtures:
248 arg = fixtures[0] if len(fixtures) == 1 else fixtures
249 raise request.raiseerror(
250 "cannot use {} and {} at the same time".format(arg, name)
251 )
254@pytest.fixture
255def capsys(request):
256 """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``.
258 The captured output is made available via ``capsys.readouterr()`` method
259 calls, which return a ``(out, err)`` namedtuple.
260 ``out`` and ``err`` will be ``text`` objects.
261 """
262 _ensure_only_one_capture_fixture(request, "capsys")
263 with _install_capture_fixture_on_item(request, SysCapture) as fixture:
264 yield fixture
267@pytest.fixture
268def capsysbinary(request):
269 """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``.
271 The captured output is made available via ``capsysbinary.readouterr()``
272 method calls, which return a ``(out, err)`` namedtuple.
273 ``out`` and ``err`` will be ``bytes`` objects.
274 """
275 _ensure_only_one_capture_fixture(request, "capsysbinary")
276 with _install_capture_fixture_on_item(request, SysCaptureBinary) as fixture:
277 yield fixture
280@pytest.fixture
281def capfd(request):
282 """Enable text capturing of writes to file descriptors ``1`` and ``2``.
284 The captured output is made available via ``capfd.readouterr()`` method
285 calls, which return a ``(out, err)`` namedtuple.
286 ``out`` and ``err`` will be ``text`` objects.
287 """
288 _ensure_only_one_capture_fixture(request, "capfd")
289 if not hasattr(os, "dup"):
290 pytest.skip(
291 "capfd fixture needs os.dup function which is not available in this system"
292 )
293 with _install_capture_fixture_on_item(request, FDCapture) as fixture:
294 yield fixture
297@pytest.fixture
298def capfdbinary(request):
299 """Enable bytes capturing of writes to file descriptors ``1`` and ``2``.
301 The captured output is made available via ``capfd.readouterr()`` method
302 calls, which return a ``(out, err)`` namedtuple.
303 ``out`` and ``err`` will be ``byte`` objects.
304 """
305 _ensure_only_one_capture_fixture(request, "capfdbinary")
306 if not hasattr(os, "dup"):
307 pytest.skip(
308 "capfdbinary fixture needs os.dup function which is not available in this system"
309 )
310 with _install_capture_fixture_on_item(request, FDCaptureBinary) as fixture:
311 yield fixture
314@contextlib.contextmanager
315def _install_capture_fixture_on_item(request, capture_class):
316 """
317 Context manager which creates a ``CaptureFixture`` instance and "installs" it on
318 the item/node of the given request. Used by ``capsys`` and ``capfd``.
320 The CaptureFixture is added as attribute of the item because it needs to accessed
321 by ``CaptureManager`` during its ``pytest_runtest_*`` hooks.
322 """
323 request.node._capture_fixture = fixture = CaptureFixture(capture_class, request)
324 capmanager = request.config.pluginmanager.getplugin("capturemanager")
325 # Need to active this fixture right away in case it is being used by another fixture (setup phase).
326 # If this fixture is being used only by a test function (call phase), then we wouldn't need this
327 # activation, but it doesn't hurt.
328 capmanager.activate_fixture(request.node)
329 yield fixture
330 fixture.close()
331 del request.node._capture_fixture
334class CaptureFixture:
335 """
336 Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary`
337 fixtures.
338 """
340 def __init__(self, captureclass, request):
341 self.captureclass = captureclass
342 self.request = request
343 self._capture = None
344 self._captured_out = self.captureclass.EMPTY_BUFFER
345 self._captured_err = self.captureclass.EMPTY_BUFFER
347 def _start(self):
348 if self._capture is None:
349 self._capture = MultiCapture(
350 out=True, err=True, in_=False, Capture=self.captureclass
351 )
352 self._capture.start_capturing()
354 def close(self):
355 if self._capture is not None:
356 out, err = self._capture.pop_outerr_to_orig()
357 self._captured_out += out
358 self._captured_err += err
359 self._capture.stop_capturing()
360 self._capture = None
362 def readouterr(self):
363 """Read and return the captured output so far, resetting the internal buffer.
365 :return: captured content as a namedtuple with ``out`` and ``err`` string attributes
366 """
367 captured_out, captured_err = self._captured_out, self._captured_err
368 if self._capture is not None:
369 out, err = self._capture.readouterr()
370 captured_out += out
371 captured_err += err
372 self._captured_out = self.captureclass.EMPTY_BUFFER
373 self._captured_err = self.captureclass.EMPTY_BUFFER
374 return CaptureResult(captured_out, captured_err)
376 def _suspend(self):
377 """Suspends this fixture's own capturing temporarily."""
378 if self._capture is not None:
379 self._capture.suspend_capturing()
381 def _resume(self):
382 """Resumes this fixture's own capturing temporarily."""
383 if self._capture is not None:
384 self._capture.resume_capturing()
386 @contextlib.contextmanager
387 def disabled(self):
388 """Temporarily disables capture while inside the 'with' block."""
389 capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
390 with capmanager.global_and_fixture_disabled():
391 yield
394def safe_text_dupfile(f, mode, default_encoding="UTF8"):
395 """ return an open text file object that's a duplicate of f on the
396 FD-level if possible.
397 """
398 encoding = getattr(f, "encoding", None)
399 try:
400 fd = f.fileno()
401 except Exception:
402 if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"):
403 # we seem to have a text stream, let's just use it
404 return f
405 else:
406 newfd = os.dup(fd)
407 if "b" not in mode:
408 mode += "b"
409 f = os.fdopen(newfd, mode, 0) # no buffering
410 return EncodedFile(f, encoding or default_encoding)
413class EncodedFile:
414 errors = "strict" # possibly needed by py3 code (issue555)
416 def __init__(self, buffer, encoding):
417 self.buffer = buffer
418 self.encoding = encoding
420 def write(self, obj):
421 if isinstance(obj, str):
422 obj = obj.encode(self.encoding, "replace")
423 else:
424 raise TypeError(
425 "write() argument must be str, not {}".format(type(obj).__name__)
426 )
427 self.buffer.write(obj)
429 def writelines(self, linelist):
430 data = "".join(linelist)
431 self.write(data)
433 @property
434 def name(self):
435 """Ensure that file.name is a string."""
436 return repr(self.buffer)
438 @property
439 def mode(self):
440 return self.buffer.mode.replace("b", "")
442 def __getattr__(self, name):
443 return getattr(object.__getattribute__(self, "buffer"), name)
446CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"])
449class MultiCapture:
450 out = err = in_ = None
451 _state = None
453 def __init__(self, out=True, err=True, in_=True, Capture=None):
454 if in_:
455 self.in_ = Capture(0)
456 if out:
457 self.out = Capture(1)
458 if err:
459 self.err = Capture(2)
461 def __repr__(self):
462 return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format(
463 self.out,
464 self.err,
465 self.in_,
466 self._state,
467 getattr(self, "_in_suspended", "<UNSET>"),
468 )
470 def start_capturing(self):
471 self._state = "started"
472 if self.in_:
473 self.in_.start()
474 if self.out:
475 self.out.start()
476 if self.err:
477 self.err.start()
479 def pop_outerr_to_orig(self):
480 """ pop current snapshot out/err capture and flush to orig streams. """
481 out, err = self.readouterr()
482 if out:
483 self.out.writeorg(out)
484 if err:
485 self.err.writeorg(err)
486 return out, err
488 def suspend_capturing(self, in_=False):
489 self._state = "suspended"
490 if self.out:
491 self.out.suspend()
492 if self.err:
493 self.err.suspend()
494 if in_ and self.in_:
495 self.in_.suspend()
496 self._in_suspended = True
498 def resume_capturing(self):
499 self._state = "resumed"
500 if self.out:
501 self.out.resume()
502 if self.err:
503 self.err.resume()
504 if hasattr(self, "_in_suspended"):
505 self.in_.resume()
506 del self._in_suspended
508 def stop_capturing(self):
509 """ stop capturing and reset capturing streams """
510 if self._state == "stopped":
511 raise ValueError("was already stopped")
512 self._state = "stopped"
513 if self.out:
514 self.out.done()
515 if self.err:
516 self.err.done()
517 if self.in_:
518 self.in_.done()
520 def readouterr(self):
521 """ return snapshot unicode value of stdout/stderr capturings. """
522 return CaptureResult(
523 self.out.snap() if self.out is not None else "",
524 self.err.snap() if self.err is not None else "",
525 )
528class NoCapture:
529 EMPTY_BUFFER = None
530 __init__ = start = done = suspend = resume = lambda *args: None
533class FDCaptureBinary:
534 """Capture IO to/from a given os-level filedescriptor.
536 snap() produces `bytes`
537 """
539 EMPTY_BUFFER = b""
540 _state = None
542 def __init__(self, targetfd, tmpfile=None):
543 self.targetfd = targetfd
544 try:
545 self.targetfd_save = os.dup(self.targetfd)
546 except OSError:
547 self.start = lambda: None
548 self.done = lambda: None
549 else:
550 self.start = self._start
551 self.done = self._done
552 if targetfd == 0:
553 assert not tmpfile, "cannot set tmpfile with stdin"
554 tmpfile = open(os.devnull, "r")
555 self.syscapture = SysCapture(targetfd)
556 else:
557 if tmpfile is None:
558 f = TemporaryFile()
559 with f:
560 tmpfile = safe_text_dupfile(f, mode="wb+")
561 if targetfd in patchsysdict:
562 self.syscapture = SysCapture(targetfd, tmpfile)
563 else:
564 self.syscapture = NoCapture()
565 self.tmpfile = tmpfile
566 self.tmpfile_fd = tmpfile.fileno()
568 def __repr__(self):
569 return "<FDCapture {} oldfd={} _state={!r}>".format(
570 self.targetfd, getattr(self, "targetfd_save", None), self._state
571 )
573 def _start(self):
574 """ Start capturing on targetfd using memorized tmpfile. """
575 try:
576 os.fstat(self.targetfd_save)
577 except (AttributeError, OSError):
578 raise ValueError("saved filedescriptor not valid anymore")
579 os.dup2(self.tmpfile_fd, self.targetfd)
580 self.syscapture.start()
581 self._state = "started"
583 def snap(self):
584 self.tmpfile.seek(0)
585 res = self.tmpfile.read()
586 self.tmpfile.seek(0)
587 self.tmpfile.truncate()
588 return res
590 def _done(self):
591 """ stop capturing, restore streams, return original capture file,
592 seeked to position zero. """
593 targetfd_save = self.__dict__.pop("targetfd_save")
594 os.dup2(targetfd_save, self.targetfd)
595 os.close(targetfd_save)
596 self.syscapture.done()
597 self.tmpfile.close()
598 self._state = "done"
600 def suspend(self):
601 self.syscapture.suspend()
602 os.dup2(self.targetfd_save, self.targetfd)
603 self._state = "suspended"
605 def resume(self):
606 self.syscapture.resume()
607 os.dup2(self.tmpfile_fd, self.targetfd)
608 self._state = "resumed"
610 def writeorg(self, data):
611 """ write to original file descriptor. """
612 if isinstance(data, str):
613 data = data.encode("utf8") # XXX use encoding of original stream
614 os.write(self.targetfd_save, data)
617class FDCapture(FDCaptureBinary):
618 """Capture IO to/from a given os-level filedescriptor.
620 snap() produces text
621 """
623 # Ignore type because it doesn't match the type in the superclass (bytes).
624 EMPTY_BUFFER = str() # type: ignore
626 def snap(self):
627 res = super().snap()
628 enc = getattr(self.tmpfile, "encoding", None)
629 if enc and isinstance(res, bytes):
630 res = str(res, enc, "replace")
631 return res
634class SysCapture:
636 EMPTY_BUFFER = str()
637 _state = None
639 def __init__(self, fd, tmpfile=None):
640 name = patchsysdict[fd]
641 self._old = getattr(sys, name)
642 self.name = name
643 if tmpfile is None:
644 if name == "stdin":
645 tmpfile = DontReadFromInput()
646 else:
647 tmpfile = CaptureIO()
648 self.tmpfile = tmpfile
650 def __repr__(self):
651 return "<SysCapture {} _old={!r}, tmpfile={!r} _state={!r}>".format(
652 self.name, self._old, self.tmpfile, self._state
653 )
655 def start(self):
656 setattr(sys, self.name, self.tmpfile)
657 self._state = "started"
659 def snap(self):
660 res = self.tmpfile.getvalue()
661 self.tmpfile.seek(0)
662 self.tmpfile.truncate()
663 return res
665 def done(self):
666 setattr(sys, self.name, self._old)
667 del self._old
668 self.tmpfile.close()
669 self._state = "done"
671 def suspend(self):
672 setattr(sys, self.name, self._old)
673 self._state = "suspended"
675 def resume(self):
676 setattr(sys, self.name, self.tmpfile)
677 self._state = "resumed"
679 def writeorg(self, data):
680 self._old.write(data)
681 self._old.flush()
684class SysCaptureBinary(SysCapture):
685 # Ignore type because it doesn't match the type in the superclass (str).
686 EMPTY_BUFFER = b"" # type: ignore
688 def snap(self):
689 res = self.tmpfile.buffer.getvalue()
690 self.tmpfile.seek(0)
691 self.tmpfile.truncate()
692 return res
695class DontReadFromInput:
696 encoding = None
698 def read(self, *args):
699 raise IOError(
700 "pytest: reading from stdin while output is captured! Consider using `-s`."
701 )
703 readline = read
704 readlines = read
705 __next__ = read
707 def __iter__(self):
708 return self
710 def fileno(self):
711 raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()")
713 def isatty(self):
714 return False
716 def close(self):
717 pass
719 @property
720 def buffer(self):
721 return self
724def _colorama_workaround():
725 """
726 Ensure colorama is imported so that it attaches to the correct stdio
727 handles on Windows.
729 colorama uses the terminal on import time. So if something does the
730 first import of colorama while I/O capture is active, colorama will
731 fail in various ways.
732 """
733 if sys.platform.startswith("win32"):
734 try:
735 import colorama # noqa: F401
736 except ImportError:
737 pass
740def _readline_workaround():
741 """
742 Ensure readline is imported so that it attaches to the correct stdio
743 handles on Windows.
745 Pdb uses readline support where available--when not running from the Python
746 prompt, the readline module is not imported until running the pdb REPL. If
747 running pytest with the --pdb option this means the readline module is not
748 imported until after I/O capture has been started.
750 This is a problem for pyreadline, which is often used to implement readline
751 support on Windows, as it does not attach to the correct handles for stdout
752 and/or stdin if they have been redirected by the FDCapture mechanism. This
753 workaround ensures that readline is imported before I/O capture is setup so
754 that it can attach to the actual stdin/out for the console.
756 See https://github.com/pytest-dev/pytest/pull/1281
757 """
758 if sys.platform.startswith("win32"):
759 try:
760 import readline # noqa: F401
761 except ImportError:
762 pass
765def _py36_windowsconsoleio_workaround(stream):
766 """
767 Python 3.6 implemented unicode console handling for Windows. This works
768 by reading/writing to the raw console handle using
769 ``{Read,Write}ConsoleW``.
771 The problem is that we are going to ``dup2`` over the stdio file
772 descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the
773 handles used by Python to write to the console. Though there is still some
774 weirdness and the console handle seems to only be closed randomly and not
775 on the first call to ``CloseHandle``, or maybe it gets reopened with the
776 same handle value when we suspend capturing.
778 The workaround in this case will reopen stdio with a different fd which
779 also means a different handle by replicating the logic in
780 "Py_lifecycle.c:initstdio/create_stdio".
782 :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given
783 here as parameter for unittesting purposes.
785 See https://github.com/pytest-dev/py/issues/103
786 """
787 if (
788 not sys.platform.startswith("win32")
789 or sys.version_info[:2] < (3, 6)
790 or hasattr(sys, "pypy_version_info")
791 ):
792 return
794 # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666)
795 if not hasattr(stream, "buffer"):
796 return
798 buffered = hasattr(stream.buffer, "raw")
799 raw_stdout = stream.buffer.raw if buffered else stream.buffer
801 if not isinstance(raw_stdout, io._WindowsConsoleIO):
802 return
804 def _reopen_stdio(f, mode):
805 if not buffered and mode[0] == "w":
806 buffering = 0
807 else:
808 buffering = -1
810 return io.TextIOWrapper(
811 open(os.dup(f.fileno()), mode, buffering),
812 f.encoding,
813 f.errors,
814 f.newlines,
815 f.line_buffering,
816 )
818 sys.stdin = _reopen_stdio(sys.stdin, "rb")
819 sys.stdout = _reopen_stdio(sys.stdout, "wb")
820 sys.stderr = _reopen_stdio(sys.stderr, "wb")