Spaces:
Runtime error
Runtime error
File size: 1,202 Bytes
41d1bc5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
def timeout_handler(_, __):
raise TimeoutError()
import os, json
def to_jsonl(dict_data, file_path):
with open(file_path, 'a') as file:
json_line = json.dumps(dict_data)
file.write(json_line + os.linesep)
from threading import Thread
class PropagatingThread(Thread):
def run(self):
self.exc = None
try:
if hasattr(self, '_Thread__target'):
# Thread uses name mangling prior to Python 3.
self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
else:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self, timeout=None):
super(PropagatingThread, self).join(timeout)
if self.exc:
raise self.exc
return self.ret
def function_with_timeout(func, args, timeout):
result_container = []
def wrapper():
result_container.append(func(*args))
thread = PropagatingThread(target=wrapper)
thread.start()
thread.join(timeout)
if thread.is_alive():
raise TimeoutError()
else:
return result_container[0]
|