Spaces:
Runtime error
Runtime error
# encoding: utf-8 | |
""" | |
IO related utilities. | |
""" | |
# Copyright (c) IPython Development Team. | |
# Distributed under the terms of the Modified BSD License. | |
import atexit | |
import os | |
import sys | |
import tempfile | |
from pathlib import Path | |
from warnings import warn | |
from IPython.utils.decorators import undoc | |
from .capture import CapturedIO, capture_output | |
class Tee(object): | |
"""A class to duplicate an output stream to stdout/err. | |
This works in a manner very similar to the Unix 'tee' command. | |
When the object is closed or deleted, it closes the original file given to | |
it for duplication. | |
""" | |
# Inspired by: | |
# http://mail.python.org/pipermail/python-list/2007-May/442737.html | |
def __init__(self, file_or_name, mode="w", channel='stdout'): | |
"""Construct a new Tee object. | |
Parameters | |
---------- | |
file_or_name : filename or open filehandle (writable) | |
File that will be duplicated | |
mode : optional, valid mode for open(). | |
If a filename was give, open with this mode. | |
channel : str, one of ['stdout', 'stderr'] | |
""" | |
if channel not in ['stdout', 'stderr']: | |
raise ValueError('Invalid channel spec %s' % channel) | |
if hasattr(file_or_name, 'write') and hasattr(file_or_name, 'seek'): | |
self.file = file_or_name | |
else: | |
encoding = None if "b" in mode else "utf-8" | |
self.file = open(file_or_name, mode, encoding=encoding) | |
self.channel = channel | |
self.ostream = getattr(sys, channel) | |
setattr(sys, channel, self) | |
self._closed = False | |
def close(self): | |
"""Close the file and restore the channel.""" | |
self.flush() | |
setattr(sys, self.channel, self.ostream) | |
self.file.close() | |
self._closed = True | |
def write(self, data): | |
"""Write data to both channels.""" | |
self.file.write(data) | |
self.ostream.write(data) | |
self.ostream.flush() | |
def flush(self): | |
"""Flush both channels.""" | |
self.file.flush() | |
self.ostream.flush() | |
def __del__(self): | |
if not self._closed: | |
self.close() | |
def ask_yes_no(prompt, default=None, interrupt=None): | |
"""Asks a question and returns a boolean (y/n) answer. | |
If default is given (one of 'y','n'), it is used if the user input is | |
empty. If interrupt is given (one of 'y','n'), it is used if the user | |
presses Ctrl-C. Otherwise the question is repeated until an answer is | |
given. | |
An EOF is treated as the default answer. If there is no default, an | |
exception is raised to prevent infinite loops. | |
Valid answers are: y/yes/n/no (match is not case sensitive).""" | |
answers = {'y':True,'n':False,'yes':True,'no':False} | |
ans = None | |
while ans not in answers.keys(): | |
try: | |
ans = input(prompt+' ').lower() | |
if not ans: # response was an empty string | |
ans = default | |
except KeyboardInterrupt: | |
if interrupt: | |
ans = interrupt | |
print("\r") | |
except EOFError: | |
if default in answers.keys(): | |
ans = default | |
print() | |
else: | |
raise | |
return answers[ans] | |
def temp_pyfile(src, ext='.py'): | |
"""Make a temporary python file, return filename and filehandle. | |
Parameters | |
---------- | |
src : string or list of strings (no need for ending newlines if list) | |
Source code to be written to the file. | |
ext : optional, string | |
Extension for the generated file. | |
Returns | |
------- | |
(filename, open filehandle) | |
It is the caller's responsibility to close the open file and unlink it. | |
""" | |
fname = tempfile.mkstemp(ext)[1] | |
with open(Path(fname), "w", encoding="utf-8") as f: | |
f.write(src) | |
f.flush() | |
return fname | |
def raw_print(*args, **kw): | |
"""DEPRECATED: Raw print to sys.__stdout__, otherwise identical interface to print().""" | |
warn("IPython.utils.io.raw_print has been deprecated since IPython 7.0", DeprecationWarning, stacklevel=2) | |
print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'), | |
file=sys.__stdout__) | |
sys.__stdout__.flush() | |
def raw_print_err(*args, **kw): | |
"""DEPRECATED: Raw print to sys.__stderr__, otherwise identical interface to print().""" | |
warn("IPython.utils.io.raw_print_err has been deprecated since IPython 7.0", DeprecationWarning, stacklevel=2) | |
print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'), | |
file=sys.__stderr__) | |
sys.__stderr__.flush() | |