Spaces:
Running
Running
File size: 4,298 Bytes
4dfb78b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
"""
Based on sacred/stdout_capturing.py in project Sacred
https://github.com/IDSIA/sacred
Author: Paul-Edouard Sarlin (skydes)
"""
from __future__ import division, print_function, unicode_literals
import os
import subprocess
import sys
from contextlib import contextmanager
from threading import Timer
def apply_backspaces_and_linefeeds(text):
"""
Interpret backspaces and linefeeds in text like a terminal would.
Interpret text like a terminal by removing backspace and linefeed
characters and applying them line by line.
If final line ends with a carriage it keeps it to be concatenable with next
output chunk.
"""
orig_lines = text.split("\n")
orig_lines_len = len(orig_lines)
new_lines = []
for orig_line_idx, orig_line in enumerate(orig_lines):
chars, cursor = [], 0
orig_line_len = len(orig_line)
for orig_char_idx, orig_char in enumerate(orig_line):
if orig_char == "\r" and (
orig_char_idx != orig_line_len - 1
or orig_line_idx != orig_lines_len - 1
):
cursor = 0
elif orig_char == "\b":
cursor = max(0, cursor - 1)
else:
if (
orig_char == "\r"
and orig_char_idx == orig_line_len - 1
and orig_line_idx == orig_lines_len - 1
):
cursor = len(chars)
if cursor == len(chars):
chars.append(orig_char)
else:
chars[cursor] = orig_char
cursor += 1
new_lines.append("".join(chars))
return "\n".join(new_lines)
def flush():
"""Try to flush all stdio buffers, both from python and from C."""
try:
sys.stdout.flush()
sys.stderr.flush()
except (AttributeError, ValueError, IOError):
pass # unsupported
# Duplicate stdout and stderr to a file. Inspired by:
# http://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
# http://stackoverflow.com/a/651718/1388435
# http://stackoverflow.com/a/22434262/1388435
@contextmanager
def capture_outputs(filename):
"""Duplicate stdout and stderr to a file on the file descriptor level."""
with open(str(filename), "a+") as target:
original_stdout_fd = 1
original_stderr_fd = 2
target_fd = target.fileno()
# Save a copy of the original stdout and stderr file descriptors
saved_stdout_fd = os.dup(original_stdout_fd)
saved_stderr_fd = os.dup(original_stderr_fd)
tee_stdout = subprocess.Popen(
["tee", "-a", "-i", "/dev/stderr"],
start_new_session=True,
stdin=subprocess.PIPE,
stderr=target_fd,
stdout=1,
)
tee_stderr = subprocess.Popen(
["tee", "-a", "-i", "/dev/stderr"],
start_new_session=True,
stdin=subprocess.PIPE,
stderr=target_fd,
stdout=2,
)
flush()
os.dup2(tee_stdout.stdin.fileno(), original_stdout_fd)
os.dup2(tee_stderr.stdin.fileno(), original_stderr_fd)
try:
yield
finally:
flush()
# then redirect stdout back to the saved fd
tee_stdout.stdin.close()
tee_stderr.stdin.close()
# restore original fds
os.dup2(saved_stdout_fd, original_stdout_fd)
os.dup2(saved_stderr_fd, original_stderr_fd)
# wait for completion of the tee processes with timeout
# implemented using a timer because timeout support is py3 only
def kill_tees():
tee_stdout.kill()
tee_stderr.kill()
tee_timer = Timer(1, kill_tees)
try:
tee_timer.start()
tee_stdout.wait()
tee_stderr.wait()
finally:
tee_timer.cancel()
os.close(saved_stdout_fd)
os.close(saved_stderr_fd)
# Cleanup log file
with open(str(filename), "r") as target:
text = target.read()
text = apply_backspaces_and_linefeeds(text)
with open(str(filename), "w") as target:
target.write(text)
|