Spaces:
Running
Running
""" | |
Based on sacred/stdout_capturing.py in project Sacred | |
https://github.com/IDSIA/sacred | |
Author: Paul-Edouard Sarlin (skydes) | |
""" | |
from __future__ import division, print_function, unicode_literals | |
import os | |
import subprocess | |
import sys | |
from contextlib import contextmanager | |
from threading import Timer | |
def apply_backspaces_and_linefeeds(text): | |
""" | |
Interpret backspaces and linefeeds in text like a terminal would. | |
Interpret text like a terminal by removing backspace and linefeed | |
characters and applying them line by line. | |
If final line ends with a carriage it keeps it to be concatenable with next | |
output chunk. | |
""" | |
orig_lines = text.split("\n") | |
orig_lines_len = len(orig_lines) | |
new_lines = [] | |
for orig_line_idx, orig_line in enumerate(orig_lines): | |
chars, cursor = [], 0 | |
orig_line_len = len(orig_line) | |
for orig_char_idx, orig_char in enumerate(orig_line): | |
if orig_char == "\r" and ( | |
orig_char_idx != orig_line_len - 1 | |
or orig_line_idx != orig_lines_len - 1 | |
): | |
cursor = 0 | |
elif orig_char == "\b": | |
cursor = max(0, cursor - 1) | |
else: | |
if ( | |
orig_char == "\r" | |
and orig_char_idx == orig_line_len - 1 | |
and orig_line_idx == orig_lines_len - 1 | |
): | |
cursor = len(chars) | |
if cursor == len(chars): | |
chars.append(orig_char) | |
else: | |
chars[cursor] = orig_char | |
cursor += 1 | |
new_lines.append("".join(chars)) | |
return "\n".join(new_lines) | |
def flush(): | |
"""Try to flush all stdio buffers, both from python and from C.""" | |
try: | |
sys.stdout.flush() | |
sys.stderr.flush() | |
except (AttributeError, ValueError, IOError): | |
pass # unsupported | |
# Duplicate stdout and stderr to a file. Inspired by: | |
# http://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/ | |
# http://stackoverflow.com/a/651718/1388435 | |
# http://stackoverflow.com/a/22434262/1388435 | |
def capture_outputs(filename): | |
"""Duplicate stdout and stderr to a file on the file descriptor level.""" | |
with open(str(filename), "a+") as target: | |
original_stdout_fd = 1 | |
original_stderr_fd = 2 | |
target_fd = target.fileno() | |
# Save a copy of the original stdout and stderr file descriptors | |
saved_stdout_fd = os.dup(original_stdout_fd) | |
saved_stderr_fd = os.dup(original_stderr_fd) | |
tee_stdout = subprocess.Popen( | |
["tee", "-a", "-i", "/dev/stderr"], | |
start_new_session=True, | |
stdin=subprocess.PIPE, | |
stderr=target_fd, | |
stdout=1, | |
) | |
tee_stderr = subprocess.Popen( | |
["tee", "-a", "-i", "/dev/stderr"], | |
start_new_session=True, | |
stdin=subprocess.PIPE, | |
stderr=target_fd, | |
stdout=2, | |
) | |
flush() | |
os.dup2(tee_stdout.stdin.fileno(), original_stdout_fd) | |
os.dup2(tee_stderr.stdin.fileno(), original_stderr_fd) | |
try: | |
yield | |
finally: | |
flush() | |
# then redirect stdout back to the saved fd | |
tee_stdout.stdin.close() | |
tee_stderr.stdin.close() | |
# restore original fds | |
os.dup2(saved_stdout_fd, original_stdout_fd) | |
os.dup2(saved_stderr_fd, original_stderr_fd) | |
# wait for completion of the tee processes with timeout | |
# implemented using a timer because timeout support is py3 only | |
def kill_tees(): | |
tee_stdout.kill() | |
tee_stderr.kill() | |
tee_timer = Timer(1, kill_tees) | |
try: | |
tee_timer.start() | |
tee_stdout.wait() | |
tee_stderr.wait() | |
finally: | |
tee_timer.cancel() | |
os.close(saved_stdout_fd) | |
os.close(saved_stderr_fd) | |
# Cleanup log file | |
with open(str(filename), "r") as target: | |
text = target.read() | |
text = apply_backspaces_and_linefeeds(text) | |
with open(str(filename), "w") as target: | |
target.write(text) | |