forked from goodboy/tractor
1
0
Fork 0

Merge pull request #261 from goodboy/stricter_context_starting

`Context` oriented error relay and `MsgStream` overruns
agpl
goodboy 2021-12-07 11:22:48 -05:00 committed by GitHub
commit f7c9056419
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1131 additions and 602 deletions

View File

@ -7,7 +7,7 @@ import tractor
async def stream_data(seed): async def stream_data(seed):
for i in range(seed): for i in range(seed):
yield i yield i
await trio.sleep(0) # trigger scheduler await trio.sleep(0.0001) # trigger scheduler
# this is the third actor; the aggregator # this is the third actor; the aggregator

View File

@ -0,0 +1,37 @@
Add cross-actor-task ``Context`` oriented error relay, a new
stream overrun error-signal ``StreamOverrun``, and support
disabling ``MsgStream`` backpressure as the default before a stream
is opened or by choice of the user.
We added stricter semantics around ``tractor.Context.open_stream():``
particularly to do with streams which are only opened at one end.
Previously, if only one end opened a stream there was no way for that
sender to know if msgs are being received until first, the feeder mem
chan on the receiver side hit a backpressure state and then that
condition delayed its msg loop processing task to eventually create
backpressure on the associated IPC transport. This is non-ideal in the
case where the receiver side never opened a stream by mistake since it
results in silent block of the sender and no adherence to the underlying
mem chan buffer size settings (which is still unsolved btw).
To solve this we add non-backpressure style message pushing inside
``Actor._push_result()`` by default and only use the backpressure
``trio.MemorySendChannel.send()`` call **iff** the local end of the
context has entered ``Context.open_stream():``. This way if the stream
was never opened but the mem chan is overrun, we relay back to the
sender a (new exception) ``SteamOverrun`` error which is raised in the
sender's scope with a special error message about the stream never
having been opened. Further, this behaviour (non-backpressure style
where senders can expect an error on overruns) can now be enabled with
``.open_stream(backpressure=False)`` and the underlying mem chan size
can be specified with a kwarg ``msg_buffer_size: int``.
Further bug fixes and enhancements in this changeset include:
- fix a race we were ignoring where if the callee task opened a context
it could enter ``Context.open_stream()`` before calling
``.started()``.
- Disallow calling ``Context.started()`` more then once.
- Enable ``Context`` linked tasks error relaying via the new
``Context._maybe_raise_from_remote_msg()`` which (for now) uses
a simple ``trio.Nursery.start_soon()`` to raise the error via closure
in the local scope.

View File

@ -1,417 +1,11 @@
""" """
Bidirectional streaming and context API. Bidirectional streaming.
""" """
import platform
import pytest import pytest
import trio import trio
import tractor import tractor
from conftest import tractor_test
# the general stream semantics are
# - normal termination: far end relays a stop message which
# terminates an ongoing ``MsgStream`` iteration
# - cancel termination: context is cancelled on either side cancelling
# the "linked" inter-actor task context
_state: bool = False
@tractor.context
async def simple_setup_teardown(
ctx: tractor.Context,
data: int,
block_forever: bool = False,
) -> None:
# startup phase
global _state
_state = True
# signal to parent that we're up
await ctx.started(data + 1)
try:
if block_forever:
# block until cancelled
await trio.sleep_forever()
else:
return 'yo'
finally:
_state = False
async def assert_state(value: bool):
global _state
assert _state == value
@pytest.mark.parametrize(
'error_parent',
[False, ValueError, KeyboardInterrupt],
)
@pytest.mark.parametrize(
'callee_blocks_forever',
[False, True],
ids=lambda item: f'callee_blocks_forever={item}'
)
@pytest.mark.parametrize(
'pointlessly_open_stream',
[False, True],
ids=lambda item: f'open_stream={item}'
)
def test_simple_context(
error_parent,
callee_blocks_forever,
pointlessly_open_stream,
):
timeout = 1.5 if not platform.system() == 'Windows' else 3
async def main():
with trio.fail_after(timeout):
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
'simple_context',
enable_modules=[__name__],
)
try:
async with portal.open_context(
simple_setup_teardown,
data=10,
block_forever=callee_blocks_forever,
) as (ctx, sent):
assert sent == 11
if callee_blocks_forever:
await portal.run(assert_state, value=True)
else:
assert await ctx.result() == 'yo'
if not error_parent:
await ctx.cancel()
if pointlessly_open_stream:
async with ctx.open_stream():
if error_parent:
raise error_parent
if callee_blocks_forever:
await ctx.cancel()
else:
# in this case the stream will send a
# 'stop' msg to the far end which needs
# to be ignored
pass
else:
if error_parent:
raise error_parent
finally:
# after cancellation
if not error_parent:
await portal.run(assert_state, value=False)
# shut down daemon
await portal.cancel_actor()
if error_parent:
try:
trio.run(main)
except error_parent:
pass
except trio.MultiError as me:
# XXX: on windows it seems we may have to expect the group error
from tractor._exceptions import is_multi_cancelled
assert is_multi_cancelled(me)
else:
trio.run(main)
# basic stream terminations:
# - callee context closes without using stream
# - caller context closes without using stream
# - caller context calls `Context.cancel()` while streaming
# is ongoing resulting in callee being cancelled
# - callee calls `Context.cancel()` while streaming and caller
# sees stream terminated in `RemoteActorError`
# TODO: future possible features
# - restart request: far end raises `ContextRestart`
@tractor.context
async def close_ctx_immediately(
ctx: tractor.Context,
) -> None:
await ctx.started()
global _state
async with ctx.open_stream():
pass
@tractor_test
async def test_callee_closes_ctx_after_stream_open():
'callee context closes without using stream'
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'fast_stream_closer',
enable_modules=[__name__],
)
async with portal.open_context(
close_ctx_immediately,
# flag to avoid waiting the final result
# cancel_on_exit=True,
) as (ctx, sent):
assert sent is None
with trio.fail_after(0.5):
async with ctx.open_stream() as stream:
# should fall through since ``StopAsyncIteration``
# should be raised through translation of
# a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream:
assert 0
else:
# verify stream is now closed
try:
await stream.receive()
except trio.EndOfChannel:
pass
# TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of
# if/when we decide that's a good idea?)
try:
async with ctx.open_stream() as stream:
pass
except trio.ClosedResourceError:
pass
await portal.cancel_actor()
@tractor.context
async def expect_cancelled(
ctx: tractor.Context,
) -> None:
global _state
_state = True
await ctx.started()
try:
async with ctx.open_stream() as stream:
async for msg in stream:
await stream.send(msg) # echo server
except trio.Cancelled:
# expected case
_state = False
raise
else:
assert 0, "Wasn't cancelled!?"
@pytest.mark.parametrize(
'use_ctx_cancel_method',
[False, True],
)
@tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool,
):
'caller context closes without using stream'
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
async with portal.open_context(
expect_cancelled,
) as (ctx, sent):
await portal.run(assert_state, value=True)
assert sent is None
# call cancel explicitly
if use_ctx_cancel_method:
await ctx.cancel()
try:
async with ctx.open_stream() as stream:
async for msg in stream:
pass
except tractor.ContextCancelled:
raise # XXX: must be propagated to __aexit__
else:
assert 0, "Should have context cancelled?"
# channel should still be up
assert portal.channel.connected()
# ctx is closed here
await portal.run(assert_state, value=False)
else:
try:
with trio.fail_after(0.2):
await ctx.result()
assert 0, "Callee should have blocked!?"
except trio.TooSlowError:
await ctx.cancel()
try:
async with ctx.open_stream() as stream:
async for msg in stream:
pass
except tractor.ContextCancelled:
pass
else:
assert 0, "Should have received closed resource error?"
# ctx is closed here
await portal.run(assert_state, value=False)
# channel should not have been destroyed yet, only the
# inter-actor-task context
assert portal.channel.connected()
# teardown the actor
await portal.cancel_actor()
@tractor_test
async def test_multitask_caller_cancels_from_nonroot_task():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
async with portal.open_context(
expect_cancelled,
) as (ctx, sent):
await portal.run(assert_state, value=True)
assert sent is None
async with ctx.open_stream() as stream:
async def send_msg_then_cancel():
await stream.send('yo')
await portal.run(assert_state, value=True)
await ctx.cancel()
await portal.run(assert_state, value=False)
async with trio.open_nursery() as n:
n.start_soon(send_msg_then_cancel)
try:
async for msg in stream:
assert msg == 'yo'
except tractor.ContextCancelled:
raise # XXX: must be propagated to __aexit__
# channel should still be up
assert portal.channel.connected()
# ctx is closed here
await portal.run(assert_state, value=False)
# channel should not have been destroyed yet, only the
# inter-actor-task context
assert portal.channel.connected()
# teardown the actor
await portal.cancel_actor()
@tractor.context
async def cancel_self(
ctx: tractor.Context,
) -> None:
global _state
_state = True
await ctx.cancel()
try:
with trio.fail_after(0.1):
await trio.sleep_forever()
except trio.Cancelled:
raise
except trio.TooSlowError:
# should never get here
assert 0
@tractor_test
async def test_callee_cancels_before_started():
'''callee calls `Context.cancel()` while streaming and caller
sees stream terminated in `ContextCancelled`.
'''
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'cancels_self',
enable_modules=[__name__],
)
try:
async with portal.open_context(
cancel_self,
) as (ctx, sent):
async with ctx.open_stream():
await trio.sleep_forever()
# raises a special cancel signal
except tractor.ContextCancelled as ce:
ce.type == trio.Cancelled
# teardown the actor
await portal.cancel_actor()
@tractor.context @tractor.context
async def simple_rpc( async def simple_rpc(
@ -420,9 +14,10 @@ async def simple_rpc(
data: int, data: int,
) -> None: ) -> None:
"""Test a small ping-pong server. '''
Test a small ping-pong server.
""" '''
# signal to parent that we're up # signal to parent that we're up
await ctx.started(data + 1) await ctx.started(data + 1)
@ -480,9 +75,10 @@ async def simple_rpc_with_forloop(
[simple_rpc, simple_rpc_with_forloop], [simple_rpc, simple_rpc_with_forloop],
) )
def test_simple_rpc(server_func, use_async_for): def test_simple_rpc(server_func, use_async_for):
"""The simplest request response pattern. '''
The simplest request response pattern.
""" '''
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:

View File

@ -14,7 +14,7 @@ MESSAGE = 'tractoring at full speed'
@tractor.context @tractor.context
async def worker(ctx: tractor.Context) -> None: async def worker(ctx: tractor.Context) -> None:
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream(backpressure=True) as stream:
async for msg in stream: async for msg in stream:
# do something with msg # do something with msg
print(msg) print(msg)

View File

@ -0,0 +1,714 @@
'''
``async with ():`` inlined context-stream cancellation testing.
Verify the we raise errors when streams are opened prior to sync-opening
a ``tractor.Context`` beforehand.
'''
from itertools import count
import platform
from typing import Optional
import pytest
import trio
import tractor
from tractor._exceptions import StreamOverrun
from conftest import tractor_test
# ``Context`` semantics are as follows,
# ------------------------------------
# - standard setup/teardown:
# ``Portal.open_context()`` starts a new
# remote task context in another actor. The target actor's task must
# call ``Context.started()`` to unblock this entry on the caller side.
# the callee task executes until complete and returns a final value
# which is delivered to the caller side and retreived via
# ``Context.result()``.
# - cancel termination:
# context can be cancelled on either side where either end's task can
# call ``Context.cancel()`` which raises a local ``trio.Cancelled``
# and sends a task cancel request to the remote task which in turn
# raises a ``trio.Cancelled`` in that scope, catches it, and re-raises
# as ``ContextCancelled``. This is then caught by
# ``Portal.open_context()``'s exit and we get a graceful termination
# of the linked tasks.
# - error termination:
# error is caught after all context-cancel-scope tasks are cancelled
# via regular ``trio`` cancel scope semantics, error is sent to other
# side and unpacked as a `RemoteActorError`.
# ``Context.open_stream() as stream: MsgStream:`` msg semantics are:
# -----------------------------------------------------------------
# - either side can ``.send()`` which emits a 'yield' msgs and delivers
# a value to the a ``MsgStream.receive()`` call.
# - stream closure: one end relays a 'stop' message which terminates an
# ongoing ``MsgStream`` iteration.
# - cancel/error termination: as per the context semantics above but
# with implicit stream closure on the cancelling end.
_state: bool = False
@tractor.context
async def too_many_starteds(
ctx: tractor.Context,
) -> None:
'''
Call ``Context.started()`` more then once (an error).
'''
await ctx.started()
try:
await ctx.started()
except RuntimeError:
raise
@tractor.context
async def not_started_but_stream_opened(
ctx: tractor.Context,
) -> None:
'''
Enter ``Context.open_stream()`` without calling ``.started()``.
'''
try:
async with ctx.open_stream():
assert 0
except RuntimeError:
raise
@pytest.mark.parametrize(
'target',
[too_many_starteds, not_started_but_stream_opened],
ids='misuse_type={}'.format,
)
def test_started_misuse(target):
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
target.__name__,
enable_modules=[__name__],
)
async with portal.open_context(target) as (ctx, sent):
await trio.sleep(1)
with pytest.raises(tractor.RemoteActorError):
trio.run(main)
@tractor.context
async def simple_setup_teardown(
ctx: tractor.Context,
data: int,
block_forever: bool = False,
) -> None:
# startup phase
global _state
_state = True
# signal to parent that we're up
await ctx.started(data + 1)
try:
if block_forever:
# block until cancelled
await trio.sleep_forever()
else:
return 'yo'
finally:
_state = False
async def assert_state(value: bool):
global _state
assert _state == value
@pytest.mark.parametrize(
'error_parent',
[False, ValueError, KeyboardInterrupt],
)
@pytest.mark.parametrize(
'callee_blocks_forever',
[False, True],
ids=lambda item: f'callee_blocks_forever={item}'
)
@pytest.mark.parametrize(
'pointlessly_open_stream',
[False, True],
ids=lambda item: f'open_stream={item}'
)
def test_simple_context(
error_parent,
callee_blocks_forever,
pointlessly_open_stream,
):
timeout = 1.5 if not platform.system() == 'Windows' else 4
async def main():
with trio.fail_after(timeout):
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
'simple_context',
enable_modules=[__name__],
)
try:
async with portal.open_context(
simple_setup_teardown,
data=10,
block_forever=callee_blocks_forever,
) as (ctx, sent):
assert sent == 11
if callee_blocks_forever:
await portal.run(assert_state, value=True)
else:
assert await ctx.result() == 'yo'
if not error_parent:
await ctx.cancel()
if pointlessly_open_stream:
async with ctx.open_stream():
if error_parent:
raise error_parent
if callee_blocks_forever:
await ctx.cancel()
else:
# in this case the stream will send a
# 'stop' msg to the far end which needs
# to be ignored
pass
else:
if error_parent:
raise error_parent
finally:
# after cancellation
if not error_parent:
await portal.run(assert_state, value=False)
# shut down daemon
await portal.cancel_actor()
if error_parent:
try:
trio.run(main)
except error_parent:
pass
except trio.MultiError as me:
# XXX: on windows it seems we may have to expect the group error
from tractor._exceptions import is_multi_cancelled
assert is_multi_cancelled(me)
else:
trio.run(main)
# basic stream terminations:
# - callee context closes without using stream
# - caller context closes without using stream
# - caller context calls `Context.cancel()` while streaming
# is ongoing resulting in callee being cancelled
# - callee calls `Context.cancel()` while streaming and caller
# sees stream terminated in `RemoteActorError`
# TODO: future possible features
# - restart request: far end raises `ContextRestart`
@tractor.context
async def close_ctx_immediately(
ctx: tractor.Context,
) -> None:
await ctx.started()
global _state
async with ctx.open_stream():
pass
@tractor_test
async def test_callee_closes_ctx_after_stream_open():
'callee context closes without using stream'
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'fast_stream_closer',
enable_modules=[__name__],
)
async with portal.open_context(
close_ctx_immediately,
# flag to avoid waiting the final result
# cancel_on_exit=True,
) as (ctx, sent):
assert sent is None
with trio.fail_after(0.5):
async with ctx.open_stream() as stream:
# should fall through since ``StopAsyncIteration``
# should be raised through translation of
# a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream:
assert 0
else:
# verify stream is now closed
try:
await stream.receive()
except trio.EndOfChannel:
pass
# TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of
# if/when we decide that's a good idea?)
try:
async with ctx.open_stream() as stream:
pass
except trio.ClosedResourceError:
pass
await portal.cancel_actor()
@tractor.context
async def expect_cancelled(
ctx: tractor.Context,
) -> None:
global _state
_state = True
await ctx.started()
try:
async with ctx.open_stream() as stream:
async for msg in stream:
await stream.send(msg) # echo server
except trio.Cancelled:
# expected case
_state = False
raise
else:
assert 0, "Wasn't cancelled!?"
@pytest.mark.parametrize(
'use_ctx_cancel_method',
[False, True],
)
@tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool,
):
'caller context closes without using stream'
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
async with portal.open_context(
expect_cancelled,
) as (ctx, sent):
await portal.run(assert_state, value=True)
assert sent is None
# call cancel explicitly
if use_ctx_cancel_method:
await ctx.cancel()
try:
async with ctx.open_stream() as stream:
async for msg in stream:
pass
except tractor.ContextCancelled:
raise # XXX: must be propagated to __aexit__
else:
assert 0, "Should have context cancelled?"
# channel should still be up
assert portal.channel.connected()
# ctx is closed here
await portal.run(assert_state, value=False)
else:
try:
with trio.fail_after(0.2):
await ctx.result()
assert 0, "Callee should have blocked!?"
except trio.TooSlowError:
await ctx.cancel()
try:
async with ctx.open_stream() as stream:
async for msg in stream:
pass
except tractor.ContextCancelled:
pass
else:
assert 0, "Should have received closed resource error?"
# ctx is closed here
await portal.run(assert_state, value=False)
# channel should not have been destroyed yet, only the
# inter-actor-task context
assert portal.channel.connected()
# teardown the actor
await portal.cancel_actor()
@tractor_test
async def test_multitask_caller_cancels_from_nonroot_task():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
async with portal.open_context(
expect_cancelled,
) as (ctx, sent):
await portal.run(assert_state, value=True)
assert sent is None
async with ctx.open_stream() as stream:
async def send_msg_then_cancel():
await stream.send('yo')
await portal.run(assert_state, value=True)
await ctx.cancel()
await portal.run(assert_state, value=False)
async with trio.open_nursery() as n:
n.start_soon(send_msg_then_cancel)
try:
async for msg in stream:
assert msg == 'yo'
except tractor.ContextCancelled:
raise # XXX: must be propagated to __aexit__
# channel should still be up
assert portal.channel.connected()
# ctx is closed here
await portal.run(assert_state, value=False)
# channel should not have been destroyed yet, only the
# inter-actor-task context
assert portal.channel.connected()
# teardown the actor
await portal.cancel_actor()
@tractor.context
async def cancel_self(
ctx: tractor.Context,
) -> None:
global _state
_state = True
await ctx.cancel()
# should inline raise immediately
try:
async with ctx.open_stream():
pass
except ContextCancelled:
pass
# check a real ``trio.Cancelled`` is raised on a checkpoint
try:
with trio.fail_after(0.1):
await trio.sleep_forever()
except trio.Cancelled:
raise
except trio.TooSlowError:
# should never get here
assert 0
@tractor_test
async def test_callee_cancels_before_started():
'''
Callee calls `Context.cancel()` while streaming and caller
sees stream terminated in `ContextCancelled`.
'''
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'cancels_self',
enable_modules=[__name__],
)
try:
async with portal.open_context(
cancel_self,
) as (ctx, sent):
async with ctx.open_stream():
await trio.sleep_forever()
# raises a special cancel signal
except tractor.ContextCancelled as ce:
ce.type == trio.Cancelled
# teardown the actor
await portal.cancel_actor()
@tractor.context
async def never_open_stream(
ctx: tractor.Context,
) -> None:
'''
Context which never opens a stream and blocks.
'''
await ctx.started()
await trio.sleep_forever()
@tractor.context
async def keep_sending_from_callee(
ctx: tractor.Context,
msg_buffer_size: Optional[int] = None,
) -> None:
'''
Send endlessly on the calleee stream.
'''
await ctx.started()
async with ctx.open_stream(
msg_buffer_size=msg_buffer_size,
) as stream:
for msg in count():
print(f'callee sending {msg}')
await stream.send(msg)
await trio.sleep(0.01)
@pytest.mark.parametrize(
'overrun_by',
[
('caller', 1, never_open_stream),
('cancel_caller_during_overrun', 1, never_open_stream),
('callee', 0, keep_sending_from_callee),
],
ids='overrun_condition={}'.format,
)
def test_one_end_stream_not_opened(overrun_by):
'''
This should exemplify the bug from:
https://github.com/goodboy/tractor/issues/265
'''
overrunner, buf_size_increase, entrypoint = overrun_by
from tractor._actor import Actor
buf_size = buf_size_increase + Actor.msg_buffer_size
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
entrypoint.__name__,
enable_modules=[__name__],
)
async with portal.open_context(
entrypoint,
) as (ctx, sent):
assert sent is None
if 'caller' in overrunner:
async with ctx.open_stream() as stream:
for i in range(buf_size):
print(f'sending {i}')
await stream.send(i)
if 'cancel' in overrunner:
# without this we block waiting on the child side
await ctx.cancel()
else:
# expect overrun error to be relayed back
# and this sleep interrupted
await trio.sleep_forever()
else:
# callee overruns caller case so we do nothing here
await trio.sleep_forever()
await portal.cancel_actor()
# 2 overrun cases and the no overrun case (which pushes right up to
# the msg limit)
if overrunner == 'caller':
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.type == StreamOverrun
elif 'cancel' in overrunner:
with pytest.raises(trio.MultiError) as excinfo:
trio.run(main)
multierr = excinfo.value
for exc in multierr.exceptions:
etype = type(exc)
if etype == tractor.RemoteActorError:
assert exc.type == StreamOverrun
else:
assert etype == tractor.ContextCancelled
elif overrunner == 'callee':
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
# TODO: embedded remote errors so that we can verify the source
# error?
# the callee delivers an error which is an overrun wrapped
# in a remote actor error.
assert excinfo.value.type == tractor.RemoteActorError
else:
trio.run(main)
@tractor.context
async def echo_back_sequence(
ctx: tractor.Context,
seq: list[int],
msg_buffer_size: Optional[int] = None,
) -> None:
'''
Send endlessly on the calleee stream.
'''
await ctx.started()
async with ctx.open_stream(
msg_buffer_size=msg_buffer_size,
) as stream:
seq = list(seq) # bleh, `msgpack`...
count = 0
while count < 3:
batch = []
async for msg in stream:
batch.append(msg)
if batch == seq:
break
for msg in batch:
print(f'callee sending {msg}')
await stream.send(msg)
count += 1
return 'yo'
def test_stream_backpressure():
'''
Demonstrate small overruns of each task back and forth
on a stream not raising any errors by default.
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'callee_sends_forever',
enable_modules=[__name__],
)
seq = list(range(3))
async with portal.open_context(
echo_back_sequence,
seq=seq,
msg_buffer_size=1,
) as (ctx, sent):
assert sent is None
async with ctx.open_stream(msg_buffer_size=1) as stream:
count = 0
while count < 3:
for msg in seq:
print(f'caller sending {msg}')
await stream.send(msg)
await trio.sleep(0.1)
batch = []
async for msg in stream:
batch.append(msg)
if batch == seq:
break
count += 1
# here the context should return
assert await ctx.result() == 'yo'
# cancel the daemon
await portal.cancel_actor()
trio.run(main)

View File

@ -132,7 +132,7 @@ async def stream_data(seed):
yield i yield i
# trigger scheduler to simulate practical usage # trigger scheduler to simulate practical usage
await trio.sleep(0) await trio.sleep(0.0001)
# this is the third actor; the aggregator # this is the third actor; the aggregator

View File

@ -83,7 +83,7 @@ async def open_sequence_streamer(
) as (ctx, first): ) as (ctx, first):
assert first is None assert first is None
async with ctx.open_stream() as stream: async with ctx.open_stream(backpressure=True) as stream:
yield stream yield stream
await portal.cancel_actor() await portal.cancel_actor()
@ -334,7 +334,7 @@ def test_ensure_slow_consumers_lag_out(
if task.name == 'sub_1': if task.name == 'sub_1':
# trigger checkpoint to clean out other subs # trigger checkpoint to clean out other subs
await trio.sleep(0) await trio.sleep(0.01)
# the non-lagger got # the non-lagger got
# a ``trio.EndOfChannel`` # a ``trio.EndOfChannel``
@ -401,7 +401,7 @@ def test_ensure_slow_consumers_lag_out(
assert not tx._state.open_send_channels assert not tx._state.open_send_channels
# check that "first" bcaster that we created # check that "first" bcaster that we created
# above, never wass iterated and is thus overrun # above, never was iterated and is thus overrun
try: try:
await brx.receive() await brx.receive()
except Lagged: except Lagged:

View File

@ -32,6 +32,7 @@ from ._exceptions import (
is_multi_cancelled, is_multi_cancelled,
ContextCancelled, ContextCancelled,
TransportClosed, TransportClosed,
StreamOverrun,
) )
from . import _debug from . import _debug
from ._discovery import get_arbiter from ._discovery import get_arbiter
@ -50,6 +51,7 @@ async def _invoke(
chan: Channel, chan: Channel,
func: typing.Callable, func: typing.Callable,
kwargs: dict[str, Any], kwargs: dict[str, Any],
is_rpc: bool = True, is_rpc: bool = True,
task_status: TaskStatus[ task_status: TaskStatus[
Union[trio.CancelScope, BaseException] Union[trio.CancelScope, BaseException]
@ -68,9 +70,10 @@ async def _invoke(
tb = None tb = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
# activated cancel scope ref
cs: Optional[trio.CancelScope] = None cs: Optional[trio.CancelScope] = None
ctx = Context(chan, cid) ctx = actor.get_context(chan, cid)
context: bool = False context: bool = False
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
@ -159,15 +162,33 @@ async def _invoke(
# context func with support for bi-dir streaming # context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid}) await chan.send({'functype': 'context', 'cid': cid})
try:
async with trio.open_nursery() as scope_nursery: async with trio.open_nursery() as scope_nursery:
ctx._scope_nursery = scope_nursery ctx._scope_nursery = scope_nursery
cs = scope_nursery.cancel_scope cs = scope_nursery.cancel_scope
task_status.started(cs) task_status.started(cs)
try:
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err: except trio.Cancelled as err:
tb = err.__traceback__ tb = err.__traceback__
except trio.MultiError:
# if a context error was set then likely
# thei multierror was raised due to that
if ctx._error is not None:
raise ctx._error from None
raise
finally:
# XXX: only pop the context tracking if
# a ``@tractor.context`` entrypoint was called
assert chan.uid
ctx = actor._contexts.pop((chan.uid, cid))
if ctx:
log.runtime(f'Context entrypoint for {func} was terminated:\n{ctx}')
assert cs
if cs.cancelled_caught: if cs.cancelled_caught:
# TODO: pack in ``trio.Cancelled.__traceback__`` here # TODO: pack in ``trio.Cancelled.__traceback__`` here
@ -246,6 +267,7 @@ async def _invoke(
try: try:
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
is_complete.set() is_complete.set()
except KeyError: except KeyError:
if is_rpc: if is_rpc:
# If we're cancelled before the task returns then the # If we're cancelled before the task returns then the
@ -312,6 +334,7 @@ class Actor:
# ugh, we need to get rid of this and replace with a "registry" sys # ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216 # https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False is_arbiter: bool = False
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `_async_main()` after fork # nursery placeholders filled in by `_async_main()` after fork
_root_n: Optional[trio.Nursery] = None _root_n: Optional[trio.Nursery] = None
@ -379,19 +402,19 @@ class Actor:
self._no_more_peers.set() self._no_more_peers.set()
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
self._ongoing_rpc_tasks.set() self._ongoing_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func) # (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[ self._rpc_tasks: dict[
Tuple[Channel, str], Tuple[Channel, str],
Tuple[trio.CancelScope, typing.Callable, trio.Event] Tuple[trio.CancelScope, typing.Callable, trio.Event]
] = {} ] = {}
# map {uids -> {callids -> waiter queues}}
self._cids2qs: dict[ # map {actor uids -> Context}
self._contexts: dict[
Tuple[Tuple[str, str], str], Tuple[Tuple[str, str], str],
Tuple[ Context
trio.abc.SendChannel[Any],
trio.abc.ReceiveChannel[Any]
]
] = {} ] = {}
self._listeners: List[trio.abc.Listener] = [] self._listeners: List[trio.abc.Listener] = []
self._parent_chan: Optional[Channel] = None self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[ self._forkserver_info: Optional[
@ -546,7 +569,7 @@ class Actor:
# now in a cancelled condition) when the local runtime here # now in a cancelled condition) when the local runtime here
# is now cancelled while (presumably) in the middle of msg # is now cancelled while (presumably) in the middle of msg
# loop processing. # loop processing.
with trio.move_on_after(0.1) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
# Attempt to wait for the far end to close the channel # Attempt to wait for the far end to close the channel
# and bail after timeout (2-generals on closure). # and bail after timeout (2-generals on closure).
@ -567,17 +590,7 @@ class Actor:
await local_nursery.exited.wait() await local_nursery.exited.wait()
# channel cleanup sequence # ``Channel`` teardown and closure sequence
# for (channel, cid) in self._rpc_tasks.copy():
# if channel is chan:
# with trio.CancelScope(shield=True):
# await self._cancel_task(cid, channel)
# # close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
log.runtime(f"Releasing channel {chan} from {chan.uid}") log.runtime(f"Releasing channel {chan} from {chan.uid}")
@ -588,6 +601,10 @@ class Actor:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(chan.uid, None)
# for (uid, cid) in self._contexts.copy():
# if chan.uid == uid:
# self._contexts.pop((uid, cid))
log.runtime(f"Peers is {self._peers}") log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected if not self._peers: # no more channels connected
@ -619,26 +636,32 @@ class Actor:
cid: str, cid: str,
msg: dict[str, Any], msg: dict[str, Any],
) -> None: ) -> None:
"""Push an RPC result to the local consumer's queue. '''
""" Push an RPC result to the local consumer's queue.
# actorid = chan.uid
assert chan.uid, f"`chan.uid` can't be {chan.uid}"
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
assert send_chan.cid == cid # type: ignore
# if 'error' in msg:
# ctx = getattr(recv_chan, '_ctx', None)
# if ctx:
# ctx._error_from_remote_msg(msg)
# log.runtime(f"{send_chan} was terminated at remote end")
# # indicate to consumer that far end has stopped
# return await send_chan.aclose()
'''
uid = chan.uid
assert uid, f"`chan.uid` can't be {uid}"
try: try:
ctx = self._contexts[(uid, cid)]
except KeyError:
log.warning(
f'Ignoring msg from [no-longer/un]known context with {uid}:'
f'\n{msg}')
return
send_chan = ctx._send_chan
log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
# maintain backpressure
await send_chan.send(msg) # XXX: we do **not** maintain backpressure and instead
# opt to relay stream overrun errors to the sender.
try:
send_chan.send_nowait(msg)
# if an error is deteced we should always
# expect it to be raised by any context (stream)
# consumer task
await ctx._maybe_raise_from_remote_msg(msg)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the # TODO: what is the right way to handle the case where the
@ -650,44 +673,120 @@ class Actor:
# XXX: local consumer has closed their side # XXX: local consumer has closed their side
# so cancel the far end streaming task # so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed") log.warning(f"{send_chan} consumer is already closed")
return
def get_memchans( except trio.WouldBlock:
self, # XXX: always push an error even if the local
actorid: Tuple[str, str], # receiver is in overrun state.
cid: str await ctx._maybe_raise_from_remote_msg(msg)
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: uid = chan.uid
lines = [
'Task context stream was overrun',
f'local task: {cid} @ {self.uid}',
f'remote sender: {uid}',
]
if not ctx._stream_opened:
lines.insert(
1,
f'\n*** No stream open on `{self.uid[0]}` side! ***\n'
)
text = '\n'.join(lines)
log.runtime(f"Getting result queue for {actorid} cid {cid}") if ctx._backpressure:
log.warning(text)
await send_chan.send(msg)
else:
try: try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)] raise StreamOverrun(text) from None
except StreamOverrun as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
try:
await chan.send(err_msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
def get_context(
self,
chan: Channel,
cid: str,
msg_buffer_size: Optional[int] = None,
) -> Context:
'''
Look up or create a new inter-actor-task-IPC-linked task
"context" which encapsulates the local task's scheduling
enviroment including a ``trio`` cancel scope, a pair of IPC
messaging "feeder" channels, and an RPC id unique to the
task-as-function invocation.
'''
log.runtime(f"Getting result queue for {chan.uid} cid {cid}")
actor_uid = chan.uid
assert actor_uid
try:
ctx = self._contexts[(actor_uid, cid)]
# adjust buffer size if specified
state = ctx._send_chan._state # type: ignore
if msg_buffer_size and state.max_buffer_size != msg_buffer_size:
state.max_buffer_size = msg_buffer_size
except KeyError: except KeyError:
send_chan, recv_chan = trio.open_memory_channel(2**6) send_chan: trio.MemorySendChannel
send_chan.cid = cid # type: ignore recv_chan: trio.MemoryReceiveChannel
recv_chan.cid = cid # type: ignore send_chan, recv_chan = trio.open_memory_channel(
self._cids2qs[(actorid, cid)] = send_chan, recv_chan msg_buffer_size or self.msg_buffer_size)
ctx = Context(
chan,
cid,
_send_chan=send_chan,
_recv_chan=recv_chan,
)
self._contexts[(actor_uid, cid)] = ctx
return send_chan, recv_chan return ctx
async def send_cmd( async def start_remote_task(
self, self,
chan: Channel, chan: Channel,
ns: str, ns: str,
func: str, func: str,
kwargs: dict kwargs: dict,
) -> Tuple[str, trio.abc.ReceiveChannel]: msg_buffer_size: Optional[int] = None,
) -> Context:
''' '''
Send a ``'cmd'`` message to a remote actor and return a Send a ``'cmd'`` message to a remote actor, which starts
caller id and a ``trio.Queue`` that can be used to wait for a remote task-as-function entrypoint.
responses delivered by the local message processing loop.
Synchronously validates the endpoint type and return a caller
side task ``Context`` that can be used to wait for responses
delivered by the local runtime's message processing loop.
''' '''
cid = str(uuid.uuid4()) cid = str(uuid.uuid4())
assert chan.uid assert chan.uid
send_chan, recv_chan = self.get_memchans(chan.uid, cid) ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size)
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return cid, recv_chan
# Wait on first response msg and validate; this should be
# immediate.
first_msg = await ctx._recv_chan.receive()
functype = first_msg.get('functype')
if 'error' in first_msg:
raise unpack_error(first_msg, chan)
elif functype not in ('asyncfunc', 'asyncgen', 'context'):
raise ValueError(f"{first_msg} is an invalid response packet?")
ctx._remote_func_type = functype
return ctx
async def _process_messages( async def _process_messages(
self, self,
@ -721,7 +820,8 @@ class Actor:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.cancel( log.cancel(
f"Cancelling all tasks for {chan} from {chan.uid}") f"Channerl to {chan.uid} terminated?\n"
"Cancelling all associated tasks..")
for (channel, cid) in self._rpc_tasks.copy(): for (channel, cid) in self._rpc_tasks.copy():
if channel is chan: if channel is chan:

View File

@ -61,6 +61,10 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet" "The root actor has not been initialized yet"
class StreamOverrun(trio.TooSlowError):
"This stream was overrun by sender"
def pack_error( def pack_error(
exc: BaseException, exc: BaseException,
tb=None, tb=None,

View File

@ -6,7 +6,7 @@ concurrency linked tasks running in disparate memory domains.
import importlib import importlib
import inspect import inspect
from typing import ( from typing import (
Tuple, Any, Dict, Optional, Set, Any, Optional,
Callable, AsyncGenerator Callable, AsyncGenerator
) )
from functools import partial from functools import partial
@ -49,7 +49,7 @@ async def maybe_open_nursery(
yield nursery yield nursery
def func_deats(func: Callable) -> Tuple[str, str]: def func_deats(func: Callable) -> tuple[str, str]:
return ( return (
func.__module__, func.__module__,
func.__name__, func.__name__,
@ -98,68 +98,45 @@ class Portal:
# during the portal's lifetime # during the portal's lifetime
self._result_msg: Optional[dict] = None self._result_msg: Optional[dict] = None
# When this is set to a tuple returned from ``_submit()`` then # When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some # it is expected that ``result()`` will be awaited at some
# point. Set when _submit_for_result is called # point.
self._expect_result: Optional[ self._expect_result: Optional[Context] = None
Tuple[str, Any, str, Dict[str, Any]] self._streams: set[ReceiveMsgStream] = set()
] = None
self._streams: Set[ReceiveMsgStream] = set()
self.actor = current_actor() self.actor = current_actor()
async def _submit( async def _submit_for_result(
self, self,
ns: str, ns: str,
func: str, func: str,
kwargs, **kwargs
) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: ) -> None:
"""Submit a function to be scheduled and run by actor, return the
associated caller id, response queue, response type str,
first message packet as a tuple.
This is an async call.
"""
# ship a function call request to the remote actor
cid, recv_chan = await self.actor.send_cmd(
self.channel, ns, func, kwargs)
# wait on first response msg and handle (this should be
# in an immediate response)
first_msg = await recv_chan.receive()
functype = first_msg.get('functype')
if 'error' in first_msg:
raise unpack_error(first_msg, self.channel)
elif functype not in ('asyncfunc', 'asyncgen', 'context'):
raise ValueError(f"{first_msg} is an invalid response packet?")
return cid, recv_chan, functype, first_msg
async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None:
assert self._expect_result is None, \ assert self._expect_result is None, \
"A pending main result has already been submitted" "A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, kwargs) self._expect_result = await self.actor.start_remote_task(
self.channel,
ns,
func,
kwargs
)
async def _return_once( async def _return_once(
self, self,
cid: str, ctx: Context,
recv_chan: trio.abc.ReceiveChannel,
resptype: str,
first_msg: dict
) -> dict[str, Any]: ) -> dict[str, Any]:
assert resptype == 'asyncfunc' # single response assert ctx._remote_func_type == 'asyncfunc' # single response
msg = await recv_chan.receive() msg = await ctx._recv_chan.receive()
return msg return msg
async def result(self) -> Any: async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task. '''
""" Return the result(s) from the remote actor's "main" task.
'''
# Check for non-rpc errors slapped on the # Check for non-rpc errors slapped on the
# channel for which we always raise # channel for which we always raise
exc = self.channel._exc exc = self.channel._exc
@ -178,7 +155,9 @@ class Portal:
assert self._expect_result assert self._expect_result
if self._result_msg is None: if self._result_msg is None:
self._result_msg = await self._return_once(*self._expect_result) self._result_msg = await self._return_once(
self._expect_result
)
return _unwrap_msg(self._result_msg, self.channel) return _unwrap_msg(self._result_msg, self.channel)
@ -244,7 +223,8 @@ class Portal:
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
log.cancel( log.cancel(
f"{self.channel} for {self.channel.uid} was already closed or broken?") f"{self.channel} for {self.channel.uid} was already "
"closed or broken?")
return False return False
async def run_from_ns( async def run_from_ns(
@ -269,9 +249,14 @@ class Portal:
internals. internals.
''' '''
msg = await self._return_once( ctx = await self.actor.start_remote_task(
*(await self._submit(namespace_path, function_name, kwargs)) self.channel,
namespace_path,
function_name,
kwargs,
) )
ctx._portal = self
msg = await self._return_once(ctx)
return _unwrap_msg(msg, self.channel) return _unwrap_msg(msg, self.channel)
async def run( async def run(
@ -313,10 +298,15 @@ class Portal:
fn_mod_path, fn_name = func_deats(func) fn_mod_path, fn_name = func_deats(func)
ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
fn_name,
kwargs,
)
ctx._portal = self
return _unwrap_msg( return _unwrap_msg(
await self._return_once( await self._return_once(ctx),
*(await self._submit(fn_mod_path, fn_name, kwargs)),
),
self.channel, self.channel,
) )
@ -337,27 +327,21 @@ class Portal:
f'{async_gen_func} must be an async generator function!') f'{async_gen_func} must be an async generator function!')
fn_mod_path, fn_name = func_deats(async_gen_func) fn_mod_path, fn_name = func_deats(async_gen_func)
( ctx = await self.actor.start_remote_task(
cid,
recv_chan,
functype,
first_msg
) = await self._submit(fn_mod_path, fn_name, kwargs)
# receive only stream
assert functype == 'asyncgen'
ctx = Context(
self.channel, self.channel,
cid, fn_mod_path,
# do we need this to be closed implicitly? fn_name,
# _recv_chan=recv_chan, kwargs
_portal=self
) )
ctx._portal = self
# ensure receive-only stream entrypoint
assert ctx._remote_func_type == 'asyncgen'
try: try:
# deliver receive only stream # deliver receive only stream
async with ReceiveMsgStream( async with ReceiveMsgStream(
ctx, recv_chan, ctx, ctx._recv_chan,
) as rchan: ) as rchan:
self._streams.add(rchan) self._streams.add(rchan)
yield rchan yield rchan
@ -391,8 +375,9 @@ class Portal:
func: Callable, func: Callable,
**kwargs, **kwargs,
) -> AsyncGenerator[Tuple[Context, Any], None]: ) -> AsyncGenerator[tuple[Context, Any], None]:
'''Open an inter-actor task context. '''
Open an inter-actor task context.
This is a synchronous API which allows for deterministic This is a synchronous API which allows for deterministic
setup/teardown of a remote task. The yielded ``Context`` further setup/teardown of a remote task. The yielded ``Context`` further
@ -400,7 +385,6 @@ class Portal:
and synchronized final result collection. See ``tractor.Context``. and synchronized final result collection. See ``tractor.Context``.
''' '''
# conduct target func method structural checks # conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and ( if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False) getattr(func, '_tractor_contex_function', False)
@ -408,20 +392,25 @@ class Portal:
raise TypeError( raise TypeError(
f'{func} must be an async generator function!') f'{func} must be an async generator function!')
__tracebackhide__ = True
fn_mod_path, fn_name = func_deats(func) fn_mod_path, fn_name = func_deats(func)
recv_chan: Optional[trio.MemoryReceiveChannel] = None ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
fn_name,
kwargs
)
cid, recv_chan, functype, first_msg = await self._submit( assert ctx._remote_func_type == 'context'
fn_mod_path, fn_name, kwargs) msg = await ctx._recv_chan.receive()
assert functype == 'context'
msg = await recv_chan.receive()
try: try:
# the "first" value here is delivered by the callee's # the "first" value here is delivered by the callee's
# ``Context.started()`` call. # ``Context.started()`` call.
first = msg['started'] first = msg['started']
ctx._started_called = True
except KeyError: except KeyError:
assert msg.get('cid'), ("Received internal error at context?") assert msg.get('cid'), ("Received internal error at context?")
@ -433,19 +422,14 @@ class Portal:
raise raise
_err: Optional[BaseException] = None _err: Optional[BaseException] = None
ctx._portal = self
# deliver context instance and .started() msg value in open tuple. # deliver context instance and .started() msg value in open tuple.
try: try:
async with trio.open_nursery() as scope_nursery: async with trio.open_nursery() as scope_nursery:
ctx = Context( ctx._scope_nursery = scope_nursery
self.channel,
cid,
_portal=self,
_recv_chan=recv_chan,
_scope_nursery=scope_nursery,
)
# pairs with handling in ``Actor._push_result()`` # do we need this?
# recv_chan._ctx = ctx
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield ctx, first yield ctx, first
@ -478,8 +462,9 @@ class Portal:
_err = err _err = err
# the context cancels itself on any cancel # the context cancels itself on any cancel
# causing error. # causing error.
log.cancel(f'Context {ctx} sending cancel to far end') log.cancel(
with trio.CancelScope(shield=True): f'Context to {self.channel.uid} sending cancel request..')
await ctx.cancel() await ctx.cancel()
raise raise
@ -495,8 +480,9 @@ class Portal:
# operating *in* this scope to have survived # operating *in* this scope to have survived
# we tear down the runtime feeder chan last # we tear down the runtime feeder chan last
# to avoid premature stream clobbers. # to avoid premature stream clobbers.
if recv_chan is not None: if ctx._recv_chan is not None:
await recv_chan.aclose() # should we encapsulate this in the context api?
await ctx._recv_chan.aclose()
if _err: if _err:
if ctx._cancel_called: if ctx._cancel_called:
@ -513,6 +499,9 @@ class Portal:
f'value from callee `{result}`' f'value from callee `{result}`'
) )
# remove the context from runtime tracking
self.actor._contexts.pop((self.channel.uid, ctx.cid))
@dataclass @dataclass
class LocalPortal: class LocalPortal:

View File

@ -32,9 +32,10 @@ log = get_logger(__name__)
class ReceiveMsgStream(trio.abc.ReceiveChannel): class ReceiveMsgStream(trio.abc.ReceiveChannel):
'''A IPC message stream for receiving logically sequenced values '''
over an inter-actor ``Channel``. This is the type returned to A IPC message stream for receiving logically sequenced values over
a local task which entered either ``Portal.open_stream_from()`` or an inter-actor ``Channel``. This is the type returned to a local
task which entered either ``Portal.open_stream_from()`` or
``Context.open_stream()``. ``Context.open_stream()``.
Termination rules: Termination rules:
@ -177,7 +178,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# In the bidirectional case, `Context.open_stream()` will create # In the bidirectional case, `Context.open_stream()` will create
# the `Actor._cids2qs` entry from a call to # the `Actor._cids2qs` entry from a call to
# `Actor.get_memchans()` and will send the stop message in # `Actor.get_context()` and will send the stop message in
# ``__aexit__()`` on teardown so it **does not** need to be # ``__aexit__()`` on teardown so it **does not** need to be
# called here. # called here.
if not self._ctx._portal: if not self._ctx._portal:
@ -189,6 +190,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# was it shouldn't matter since it's unlikely a user # was it shouldn't matter since it's unlikely a user
# will try to re-use a stream after attemping to close # will try to re-use a stream after attemping to close
# it). # it).
with trio.CancelScope(shield=True):
await self._ctx.send_stop() await self._ctx.send_stop()
except ( except (
@ -282,11 +284,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
class MsgStream(ReceiveMsgStream, trio.abc.Channel): class MsgStream(ReceiveMsgStream, trio.abc.Channel):
""" '''
Bidirectional message stream for use within an inter-actor actor Bidirectional message stream for use within an inter-actor actor
``Context```. ``Context```.
""" '''
async def send( async def send(
self, self,
data: Any data: Any
@ -297,36 +299,58 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
# if self._eoc: # if self._eoc:
# raise trio.ClosedResourceError('This stream is already ded') # raise trio.ClosedResourceError('This stream is already ded')
if self._ctx._error:
raise self._ctx._error # from None
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
@dataclass @dataclass
class Context: class Context:
'''An inter-actor task communication context. '''
An inter-actor, ``trio`` task communication context.
NB: This class should never be instatiated directly, it is delivered
by either runtime machinery to a remotely started task or by entering
``Portal.open_context()``.
Allows maintaining task or protocol specific state between Allows maintaining task or protocol specific state between
2 communicating actor tasks. A unique context is created on the 2 communicating actor tasks. A unique context is created on the
callee side/end for every request to a remote actor from a portal. callee side/end for every request to a remote actor from a portal.
A context can be cancelled and (possibly eventually restarted) from A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel. either side of the underlying IPC channel, open task oriented
message streams and acts as an IPC aware inter-actor-task cancel
A context can be used to open task oriented message streams and can scope.
be thought of as an IPC aware inter-actor cancel scope.
''' '''
chan: Channel chan: Channel
cid: str cid: str
# these are the "feeder" channels for delivering
# message values to the local task from the runtime
# msg processing loop.
_recv_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel
_remote_func_type: Optional[str] = None
# only set on the caller side # only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa _portal: Optional['Portal'] = None # type: ignore # noqa
_recv_chan: Optional[trio.MemoryReceiveChannel] = None
_result: Optional[Any] = False _result: Optional[Any] = False
_error: Optional[BaseException] = None
# status flags
_cancel_called: bool = False _cancel_called: bool = False
_started_called: bool = False
_started_received: bool = False
_stream_opened: bool = False
# only set on the callee side # only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None _scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = False
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
warnings.warn( warnings.warn(
@ -340,26 +364,59 @@ class Context:
async def send_stop(self) -> None: async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid}) await self.chan.send({'stop': True, 'cid': self.cid})
def _error_from_remote_msg( async def _maybe_raise_from_remote_msg(
self, self,
msg: Dict[str, Any], msg: Dict[str, Any],
) -> None: ) -> None:
'''Unpack and raise a msg error into the local scope '''
(Maybe) unpack and raise a msg error into the local scope
nursery for this context. nursery for this context.
Acts as a form of "relay" for a remote error raised Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task. in the corresponding remote callee task.
''' '''
assert self._scope_nursery error = msg.get('error')
if error:
# If this is an error message from a context opened by
# ``Portal.open_context()`` we want to interrupt any ongoing
# (child) tasks within that context to be notified of the remote
# error relayed here.
#
# The reason we may want to raise the remote error immediately
# is that there is no guarantee the associated local task(s)
# will attempt to read from any locally opened stream any time
# soon.
#
# NOTE: this only applies when
# ``Portal.open_context()`` has been called since it is assumed
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
# await ctx._maybe_error_from_remote_msg(msg)
self._error = unpack_error(msg, self.chan)
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
if self._scope_nursery:
async def raiser(): async def raiser():
raise unpack_error(msg, self.chan) raise self._error from None
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
if not self._scope_nursery._closed: # type: ignore
self._scope_nursery.start_soon(raiser) self._scope_nursery.start_soon(raiser)
async def cancel(self) -> None: async def cancel(self) -> None:
'''Cancel this inter-actor-task context. '''
Cancel this inter-actor-task context.
Request that the far side cancel it's current linked context, Request that the far side cancel it's current linked context,
Timeout quickly in an attempt to sidestep 2-generals... Timeout quickly in an attempt to sidestep 2-generals...
@ -420,9 +477,12 @@ class Context:
async def open_stream( async def open_stream(
self, self,
backpressure: Optional[bool] = True,
msg_buffer_size: Optional[int] = None,
) -> AsyncGenerator[MsgStream, None]: ) -> AsyncGenerator[MsgStream, None]:
'''Open a ``MsgStream``, a bi-directional stream connected to the '''
Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``. cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and This context manager must be entered on both the caller and
@ -455,34 +515,44 @@ class Context:
f'Context around {actor.uid[0]}:{task} was already cancelled!' f'Context around {actor.uid[0]}:{task} was already cancelled!'
) )
if not self._portal and not self._started_called:
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the # NOTE: in one way streaming this only happens on the
# caller side inside `Actor.send_cmd()` so if you try # caller side inside `Actor.start_remote_task()` so if you try
# to send a stop from the caller to the callee in the # to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error # single-direction-stream case you'll get a lookup error
# currently. # currently.
_, recv_chan = actor.get_memchans( ctx = actor.get_context(
self.chan.uid, self.chan,
self.cid self.cid,
msg_buffer_size=msg_buffer_size,
) )
ctx._backpressure = backpressure
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has # XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited # been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other # a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``. # unanticipated error or cancellation from ``trio``.
if recv_chan._closed: if ctx._recv_chan._closed:
raise trio.ClosedResourceError( raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?') 'The underlying channel for this stream was already closed!?')
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=recv_chan, rx_chan=ctx._recv_chan,
) as rchan: ) as rchan:
if self._portal: if self._portal:
self._portal._streams.add(rchan) self._portal._streams.add(rchan)
try: try:
self._stream_opened = True
# ensure we aren't cancelled before delivering # ensure we aren't cancelled before delivering
# the stream # the stream
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
@ -518,7 +588,7 @@ class Context:
try: try:
self._result = msg['return'] self._result = msg['return']
break break
except KeyError: except KeyError as msgerr:
if 'yield' in msg: if 'yield' in msg:
# far end task is still streaming to us so discard # far end task is still streaming to us so discard
@ -532,17 +602,36 @@ class Context:
# internal error should never get here # internal error should never get here
assert msg.get('cid'), ( assert msg.get('cid'), (
"Received internal error at portal?") "Received internal error at portal?")
raise unpack_error(msg, self._portal.channel)
raise unpack_error(
msg, self._portal.channel
) from msgerr
return self._result return self._result
async def started(self, value: Optional[Any] = None) -> None: async def started(
self,
value: Optional[Any] = None
) -> None:
'''
Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side.
On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``.
'''
if self._portal: if self._portal:
raise RuntimeError( raise RuntimeError(
f"Caller side context {self} can not call started!") f"Caller side context {self} can not call started!")
elif self._started_called:
raise RuntimeError(
f"called 'started' twice on context with {self.chan.uid}")
await self.chan.send({'started': value, 'cid': self.cid}) await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True
# TODO: do we need a restart api? # TODO: do we need a restart api?
# async def restart(self) -> None: # async def restart(self) -> None: