Compare commits
3 Commits
f2ce4a3469
...
67f673bf36
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 67f673bf36 | |
Tyler Goodlet | 3a105e2830 | |
Tyler Goodlet | a315f01acc |
|
@ -6,7 +6,6 @@ sync-opening a ``tractor.Context`` beforehand.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from itertools import count
|
from itertools import count
|
||||||
import math
|
|
||||||
import platform
|
import platform
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import (
|
from typing import (
|
||||||
|
@ -873,7 +872,7 @@ def test_one_end_stream_not_opened(
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
with trio.fail_after(0.8):
|
with trio.fail_after(1):
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
entrypoint,
|
entrypoint,
|
||||||
) as (ctx, sent):
|
) as (ctx, sent):
|
||||||
|
@ -1060,63 +1059,54 @@ def test_maybe_allow_overruns_stream(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
)
|
)
|
||||||
|
seq = list(range(10))
|
||||||
|
async with portal.open_context(
|
||||||
|
echo_back_sequence,
|
||||||
|
seq=seq,
|
||||||
|
wait_for_cancel=cancel_ctx,
|
||||||
|
be_slow=(slow_side == 'child'),
|
||||||
|
allow_overruns_side=allow_overruns_side,
|
||||||
|
|
||||||
# stream-sequence batch info with send delay to determine
|
) as (ctx, sent):
|
||||||
# approx timeout determining whether test has hung.
|
assert sent is None
|
||||||
total_batches: int = 2
|
|
||||||
num_items: int = 10
|
|
||||||
seq = list(range(num_items))
|
|
||||||
parent_send_delay: float = 0.16
|
|
||||||
timeout: float = math.ceil(
|
|
||||||
total_batches * num_items * parent_send_delay
|
|
||||||
)
|
|
||||||
with trio.fail_after(timeout):
|
|
||||||
async with portal.open_context(
|
|
||||||
echo_back_sequence,
|
|
||||||
seq=seq,
|
|
||||||
wait_for_cancel=cancel_ctx,
|
|
||||||
be_slow=(slow_side == 'child'),
|
|
||||||
allow_overruns_side=allow_overruns_side,
|
|
||||||
|
|
||||||
) as (ctx, sent):
|
async with ctx.open_stream(
|
||||||
assert sent is None
|
msg_buffer_size=1 if slow_side == 'parent' else None,
|
||||||
|
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
|
||||||
|
) as stream:
|
||||||
|
|
||||||
async with ctx.open_stream(
|
total_batches: int = 2
|
||||||
msg_buffer_size=1 if slow_side == 'parent' else None,
|
for _ in range(total_batches):
|
||||||
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
|
for msg in seq:
|
||||||
) as stream:
|
# print(f'root tx {msg}')
|
||||||
|
await stream.send(msg)
|
||||||
|
if slow_side == 'parent':
|
||||||
|
# NOTE: we make the parent slightly
|
||||||
|
# slower, when it is slow, to make sure
|
||||||
|
# that in the overruns everywhere case
|
||||||
|
await trio.sleep(0.16)
|
||||||
|
|
||||||
for _ in range(total_batches):
|
batch = []
|
||||||
for msg in seq:
|
async for msg in stream:
|
||||||
# print(f'root tx {msg}')
|
print(f'root rx {msg}')
|
||||||
await stream.send(msg)
|
batch.append(msg)
|
||||||
if slow_side == 'parent':
|
if batch == seq:
|
||||||
# NOTE: we make the parent slightly
|
break
|
||||||
# slower, when it is slow, to make sure
|
|
||||||
# that in the overruns everywhere case
|
|
||||||
await trio.sleep(parent_send_delay)
|
|
||||||
|
|
||||||
batch = []
|
|
||||||
async for msg in stream:
|
|
||||||
print(f'root rx {msg}')
|
|
||||||
batch.append(msg)
|
|
||||||
if batch == seq:
|
|
||||||
break
|
|
||||||
|
|
||||||
if cancel_ctx:
|
|
||||||
# cancel the remote task
|
|
||||||
print('Requesting `ctx.cancel()` in parent!')
|
|
||||||
await ctx.cancel()
|
|
||||||
|
|
||||||
res: str|ContextCancelled = await ctx.result()
|
|
||||||
|
|
||||||
if cancel_ctx:
|
if cancel_ctx:
|
||||||
assert isinstance(res, ContextCancelled)
|
# cancel the remote task
|
||||||
assert tuple(res.canceller) == current_actor().uid
|
print('Requesting `ctx.cancel()` in parent!')
|
||||||
|
await ctx.cancel()
|
||||||
|
|
||||||
else:
|
res: str|ContextCancelled = await ctx.result()
|
||||||
print(f'RX ROOT SIDE RESULT {res}')
|
|
||||||
assert res == 'yo'
|
if cancel_ctx:
|
||||||
|
assert isinstance(res, ContextCancelled)
|
||||||
|
assert tuple(res.canceller) == current_actor().uid
|
||||||
|
|
||||||
|
else:
|
||||||
|
print(f'RX ROOT SIDE RESULT {res}')
|
||||||
|
assert res == 'yo'
|
||||||
|
|
||||||
# cancel the daemon
|
# cancel the daemon
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
|
@ -438,8 +438,8 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||||
'msgspec_codec',
|
'msgspec_codec',
|
||||||
|
|
||||||
# TODO: move this to our new `Msg`-spec!
|
# TODO: move this to our new `Msg`-spec!
|
||||||
default=_def_msgspec_codec,
|
# default=_def_msgspec_codec,
|
||||||
# default=_def_tractor_codec,
|
default=_def_tractor_codec,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue