forked from goodboy/tractor
commit
30986d6b64
|
@ -4,12 +4,17 @@ Advanced streaming patterns using bidirectional streams and contexts.
|
|||
'''
|
||||
from collections import Counter
|
||||
import itertools
|
||||
import platform
|
||||
from typing import Set, Dict, List
|
||||
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
|
||||
def is_win():
|
||||
return platform.system() == 'Windows'
|
||||
|
||||
|
||||
_registry: Dict[str, Set[tractor.ReceiveMsgStream]] = {
|
||||
'even': set(),
|
||||
'odd': set(),
|
||||
|
@ -173,14 +178,22 @@ async def one_task_streams_and_one_handles_reqresp(
|
|||
|
||||
|
||||
def test_reqresp_ontopof_streaming():
|
||||
'''Test a subactor that both streams with one task and
|
||||
'''
|
||||
Test a subactor that both streams with one task and
|
||||
spawns another which handles a small requests-response
|
||||
dialogue over the same bidir-stream.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
|
||||
with trio.move_on_after(2):
|
||||
# flat to make sure we get at least one pong
|
||||
got_pong: bool = False
|
||||
timeout: int = 2
|
||||
|
||||
if is_win(): # smh
|
||||
timeout = 4
|
||||
|
||||
with trio.move_on_after(timeout):
|
||||
async with tractor.open_nursery() as n:
|
||||
|
||||
# name of this actor will be same as target func
|
||||
|
@ -189,9 +202,6 @@ def test_reqresp_ontopof_streaming():
|
|||
enable_modules=[__name__]
|
||||
)
|
||||
|
||||
# flat to make sure we get at least one pong
|
||||
got_pong: bool = False
|
||||
|
||||
async with portal.open_context(
|
||||
one_task_streams_and_one_handles_reqresp,
|
||||
|
||||
|
@ -243,8 +253,12 @@ def test_sigint_both_stream_types():
|
|||
side-by-side will cancel correctly from SIGINT.
|
||||
|
||||
'''
|
||||
timeout: float = 2
|
||||
if is_win(): # smh
|
||||
timeout += 1
|
||||
|
||||
async def main():
|
||||
with trio.fail_after(2):
|
||||
with trio.fail_after(timeout):
|
||||
async with tractor.open_nursery() as n:
|
||||
# name of this actor will be same as target func
|
||||
portal = await n.start_actor(
|
||||
|
|
|
@ -15,6 +15,10 @@ import tractor
|
|||
from conftest import tractor_test, no_windows
|
||||
|
||||
|
||||
def is_win():
|
||||
return platform.system() == 'Windows'
|
||||
|
||||
|
||||
async def assert_err(delay=0):
|
||||
await trio.sleep(delay)
|
||||
assert 0
|
||||
|
@ -332,10 +336,12 @@ async def spawn_and_error(breadth, depth) -> None:
|
|||
|
||||
@tractor_test
|
||||
async def test_nested_multierrors(loglevel, start_method):
|
||||
"""Test that failed actor sets are wrapped in `trio.MultiError`s.
|
||||
This test goes only 2 nurseries deep but we should eventually have tests
|
||||
'''
|
||||
Test that failed actor sets are wrapped in `trio.MultiError`s. This
|
||||
test goes only 2 nurseries deep but we should eventually have tests
|
||||
for arbitrary n-depth actor trees.
|
||||
"""
|
||||
|
||||
'''
|
||||
if start_method == 'trio':
|
||||
depth = 3
|
||||
subactor_breadth = 2
|
||||
|
@ -364,7 +370,7 @@ async def test_nested_multierrors(loglevel, start_method):
|
|||
for subexc in err.exceptions:
|
||||
|
||||
# verify first level actor errors are wrapped as remote
|
||||
if platform.system() == 'Windows':
|
||||
if is_win():
|
||||
|
||||
# windows is often too slow and cancellation seems
|
||||
# to happen before an actor is spawned
|
||||
|
@ -397,15 +403,21 @@ async def test_nested_multierrors(loglevel, start_method):
|
|||
# XXX not sure what's up with this..
|
||||
# on windows sometimes spawning is just too slow and
|
||||
# we get back the (sent) cancel signal instead
|
||||
if platform.system() == 'Windows':
|
||||
if is_win():
|
||||
if isinstance(subexc, tractor.RemoteActorError):
|
||||
assert subexc.type in (trio.MultiError, tractor.RemoteActorError)
|
||||
assert subexc.type in (
|
||||
trio.MultiError,
|
||||
tractor.RemoteActorError
|
||||
)
|
||||
else:
|
||||
assert isinstance(subexc, trio.MultiError)
|
||||
else:
|
||||
assert subexc.type is trio.MultiError
|
||||
else:
|
||||
assert subexc.type in (tractor.RemoteActorError, trio.Cancelled)
|
||||
assert subexc.type in (
|
||||
tractor.RemoteActorError,
|
||||
trio.Cancelled
|
||||
)
|
||||
|
||||
|
||||
@no_windows
|
||||
|
@ -443,6 +455,9 @@ def test_cancel_via_SIGINT_other_task(
|
|||
from a seperate ``trio`` child task.
|
||||
"""
|
||||
pid = os.getpid()
|
||||
timeout: float = 2
|
||||
if is_win(): # smh
|
||||
timeout += 1
|
||||
|
||||
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED):
|
||||
async with tractor.open_nursery() as tn:
|
||||
|
@ -456,7 +471,7 @@ def test_cancel_via_SIGINT_other_task(
|
|||
|
||||
async def main():
|
||||
# should never timeout since SIGINT should cancel the current program
|
||||
with trio.fail_after(2):
|
||||
with trio.fail_after(timeout):
|
||||
async with trio.open_nursery() as n:
|
||||
await n.start(spawn_and_sleep_forever)
|
||||
if spawn_backend == 'mp':
|
||||
|
@ -524,6 +539,10 @@ def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon(
|
|||
|
||||
'''
|
||||
kbi_delay = 0.5
|
||||
timeout: float = 2.9
|
||||
|
||||
if is_win(): # smh
|
||||
timeout += 1
|
||||
|
||||
async def main():
|
||||
start = time.time()
|
||||
|
@ -548,7 +567,7 @@ def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon(
|
|||
await p.run(do_nuthin)
|
||||
finally:
|
||||
duration = time.time() - start
|
||||
if duration > 2.9:
|
||||
if duration > timeout:
|
||||
raise trio.TooSlowError(
|
||||
'daemon cancel was slower then necessary..'
|
||||
)
|
||||
|
|
|
@ -250,7 +250,7 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
|
|||
|
||||
results, diff = time_quad_ex
|
||||
assert results
|
||||
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.5
|
||||
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.666
|
||||
assert diff < this_fast
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue