Add stream open before started, detailed semantics comment
parent
df59071747
commit
703dee8a59
|
@ -16,16 +16,99 @@ from tractor._exceptions import StreamOverrun
|
|||
|
||||
from conftest import tractor_test
|
||||
|
||||
# the general stream semantics are
|
||||
# - normal termination: far end relays a stop message which
|
||||
# terminates an ongoing ``MsgStream`` iteration
|
||||
# - cancel termination: context is cancelled on either side cancelling
|
||||
# the "linked" inter-actor task context
|
||||
# ``Context`` semantics are as follows,
|
||||
# ------------------------------------
|
||||
|
||||
# - standard setup/teardown:
|
||||
# ``Portal.open_context()`` starts a new
|
||||
# remote task context in another actor. The target actor's task must
|
||||
# call ``Context.started()`` to unblock this entry on the caller side.
|
||||
# the callee task executes until complete and returns a final value
|
||||
# which is delivered to the caller side and retreived via
|
||||
# ``Context.result()``.
|
||||
|
||||
# - cancel termination:
|
||||
# context can be cancelled on either side where either end's task can
|
||||
# call ``Context.cancel()`` which raises a local ``trio.Cancelled``
|
||||
# and sends a task cancel request to the remote task which in turn
|
||||
# raises a ``trio.Cancelled`` in that scope, catches it, and re-raises
|
||||
# as ``ContextCancelled``. This is then caught by
|
||||
# ``Portal.open_context()``'s exit and we get a graceful termination
|
||||
# of the linked tasks.
|
||||
|
||||
# - error termination:
|
||||
# error is caught after all context-cancel-scope tasks are cancelled
|
||||
# via regular ``trio`` cancel scope semantics, error is sent to other
|
||||
# side and unpacked as a `RemoteActorError`.
|
||||
|
||||
|
||||
# ``Context.open_stream() as stream: MsgStream:`` msg semantics are:
|
||||
# -----------------------------------------------------------------
|
||||
|
||||
# - either side can ``.send()`` which emits a 'yield' msgs and delivers
|
||||
# a value to the a ``MsgStream.receive()`` call.
|
||||
|
||||
# - stream closure: one end relays a 'stop' message which terminates an
|
||||
# ongoing ``MsgStream`` iteration.
|
||||
|
||||
# - cancel/error termination: as per the context semantics above but
|
||||
# with implicit stream closure on the cancelling end.
|
||||
|
||||
|
||||
_state: bool = False
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def too_many_starteds(
|
||||
ctx: tractor.Context,
|
||||
) -> None:
|
||||
'''
|
||||
Call ``Context.started()`` more then once (an error).
|
||||
|
||||
'''
|
||||
await ctx.started()
|
||||
try:
|
||||
await ctx.started()
|
||||
except RuntimeError:
|
||||
raise
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def not_started_but_stream_opened(
|
||||
ctx: tractor.Context,
|
||||
) -> None:
|
||||
'''
|
||||
Enter ``Context.open_stream()`` without calling ``.started()``.
|
||||
|
||||
'''
|
||||
try:
|
||||
async with ctx.open_stream():
|
||||
assert 0
|
||||
except RuntimeError:
|
||||
raise
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'target',
|
||||
[too_many_starteds, not_started_but_stream_opened],
|
||||
ids='misuse_type={}'.format,
|
||||
)
|
||||
def test_started_misuse(target):
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.start_actor(
|
||||
target.__name__,
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with portal.open_context(target) as (ctx, sent):
|
||||
await trio.sleep(1)
|
||||
|
||||
with pytest.raises(tractor.RemoteActorError):
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def simple_setup_teardown(
|
||||
|
||||
|
@ -378,10 +461,18 @@ async def cancel_self(
|
|||
_state = True
|
||||
|
||||
await ctx.cancel()
|
||||
|
||||
# should inline raise immediately
|
||||
try:
|
||||
async with ctx.open_stream():
|
||||
pass
|
||||
except ContextCancelled:
|
||||
pass
|
||||
|
||||
# check a real ``trio.Cancelled`` is raised on a checkpoint
|
||||
try:
|
||||
with trio.fail_after(0.1):
|
||||
await trio.sleep_forever()
|
||||
|
||||
except trio.Cancelled:
|
||||
raise
|
||||
|
||||
|
@ -420,34 +511,6 @@ async def test_callee_cancels_before_started():
|
|||
await portal.cancel_actor()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def really_started(
|
||||
ctx: tractor.Context,
|
||||
) -> None:
|
||||
await ctx.started()
|
||||
try:
|
||||
await ctx.started()
|
||||
except RuntimeError:
|
||||
raise
|
||||
|
||||
|
||||
def test_started_called_more_then_once():
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.start_actor(
|
||||
'too_much_starteds',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with portal.open_context(really_started) as (ctx, sent):
|
||||
await trio.sleep(1)
|
||||
# pass
|
||||
|
||||
with pytest.raises(tractor.RemoteActorError):
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def never_open_stream(
|
||||
|
||||
|
|
Loading…
Reference in New Issue