|
import unittest |
|
import asyncio |
|
|
|
from iterators import AsyncTimeoutIterator |
|
|
|
|
|
async def iter_simple(): |
|
yield 1 |
|
yield 2 |
|
|
|
|
|
async def iter_with_sleep(): |
|
yield 1 |
|
await asyncio.sleep(0.6) |
|
yield 2 |
|
await asyncio.sleep(0.4) |
|
yield 3 |
|
|
|
|
|
async def iter_with_exception(): |
|
yield 1 |
|
yield 2 |
|
raise Exception |
|
yield 3 |
|
|
|
|
|
class TestTimeoutIterator(unittest.TestCase): |
|
|
|
def test_normal_iteration(self): |
|
|
|
async def _(self): |
|
i = iter_simple() |
|
it = AsyncTimeoutIterator(i) |
|
|
|
self.assertEqual(await it.__anext__(), 1) |
|
self.assertEqual(await it.__anext__(), 2) |
|
|
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_normal_iteration_for_loop(self): |
|
|
|
async def _(self): |
|
i = iter_simple() |
|
it = AsyncTimeoutIterator(i) |
|
iterResults = [] |
|
async for x in it: |
|
iterResults.append(x) |
|
self.assertEqual(iterResults, [1, 2]) |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_timeout_block(self): |
|
|
|
async def _(self): |
|
i = iter_with_sleep() |
|
it = AsyncTimeoutIterator(i) |
|
self.assertEqual(await it.__anext__(), 1) |
|
self.assertEqual(await it.__anext__(), 2) |
|
self.assertEqual(await it.__anext__(), 3) |
|
|
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_timeout_block_for_loop(self): |
|
|
|
async def _(self): |
|
i = iter_with_sleep() |
|
it = AsyncTimeoutIterator(i) |
|
iterResults = [] |
|
async for x in it: |
|
iterResults.append(x) |
|
self.assertEqual(iterResults, [1, 2, 3]) |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_fixed_timeout(self): |
|
|
|
async def _(self): |
|
i = iter_with_sleep() |
|
it = AsyncTimeoutIterator(i, timeout=0.5) |
|
|
|
self.assertEqual(await it.__anext__(), 1) |
|
self.assertEqual(await it.__anext__(), it.get_sentinel()) |
|
self.assertEqual(await it.__anext__(), 2) |
|
self.assertEqual(await it.__anext__(), 3) |
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_fixed_timeout(self): |
|
|
|
async def _(self): |
|
i = iter_with_sleep() |
|
it = AsyncTimeoutIterator(i, timeout=0.5) |
|
iterResults = [] |
|
async for x in it: |
|
iterResults.append(x) |
|
self.assertEqual(iterResults, [1, it.get_sentinel(), 2, 3]) |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_timeout_update(self): |
|
async def _(self): |
|
i = iter_with_sleep() |
|
it = AsyncTimeoutIterator(i, timeout=0.5) |
|
|
|
self.assertEqual(await it.__anext__(), 1) |
|
self.assertEqual(await it.__anext__(), it.get_sentinel()) |
|
|
|
it.set_timeout(0.3) |
|
self.assertEqual(await it.__anext__(), 2) |
|
self.assertEqual(await it.__anext__(), it.get_sentinel()) |
|
|
|
self.assertEqual(await it.__anext__(), 3) |
|
|
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_custom_sentinel(self): |
|
async def _(self): |
|
i = iter_with_sleep() |
|
it = AsyncTimeoutIterator(i, timeout=0.5, sentinel="END") |
|
self.assertEqual(await it.__anext__(), 1) |
|
self.assertEqual(await it.__anext__(), "END") |
|
|
|
self.assertEqual(await it.__anext__(), 2) |
|
self.assertEqual(await it.__anext__(), 3) |
|
|
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_feature_timeout_reset(self): |
|
async def _(self): |
|
i = iter_with_sleep() |
|
it = AsyncTimeoutIterator(i, timeout=0.5, reset_on_next=True) |
|
|
|
self.assertEqual(await it.__anext__(), 1) |
|
self.assertEqual(await it.__anext__(), 2) |
|
self.assertEqual(await it.__anext__(), 3) |
|
|
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_function_set_reset_on_next(self): |
|
async def _(self): |
|
i = iter_with_sleep() |
|
it = AsyncTimeoutIterator(i, timeout=0.35, reset_on_next=False) |
|
|
|
self.assertEqual(await it.__anext__(), 1) |
|
self.assertEqual(await it.__anext__(), it.get_sentinel()) |
|
it.set_reset_on_next(True) |
|
self.assertEqual(await it.__anext__(), 2) |
|
self.assertEqual(await it.__anext__(), 3) |
|
|
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_iterator_raises_exception(self): |
|
async def _(self): |
|
i = iter_with_exception() |
|
it = AsyncTimeoutIterator(i, timeout=0.5, sentinel="END") |
|
self.assertEqual(await it.__anext__(), 1) |
|
self.assertEqual(await it.__anext__(), 2) |
|
|
|
with self.assertRaises(Exception): |
|
await it.__anext__() |
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|
|
def test_interrupt_thread(self): |
|
async def _(self): |
|
i = iter_with_sleep() |
|
it = AsyncTimeoutIterator(i, timeout=0.5, sentinel="END") |
|
self.assertEqual(await it.__anext__(), 1) |
|
self.assertEqual(await it.__anext__(), it.get_sentinel()) |
|
it.interrupt() |
|
self.assertEqual(await it.__anext__(), 2) |
|
|
|
with self.assertRaises(StopAsyncIteration): |
|
await it.__anext__() |
|
|
|
asyncio.get_event_loop().run_until_complete(_(self)) |
|
|