From f4793af2b961f571123a23f78c8ffebc3ea1869b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 11:36:25 -0400 Subject: [PATCH 01/31] Error on mal-use of `Context.started()` Previously we were ignoring a race where the callee an opened task context could enter `Context.open_stream()` before calling `.started(). Disallow this as well as calling `.started()` more then once. --- tractor/_streaming.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 5c22116..af9ffaf 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -323,6 +323,8 @@ class Context: _recv_chan: Optional[trio.MemoryReceiveChannel] = None _result: Optional[Any] = False _cancel_called: bool = False + _started_called: bool = False + _started_received: bool = False # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None @@ -455,6 +457,11 @@ class Context: f'Context around {actor.uid[0]}:{task} was already cancelled!' ) + if not self._portal and not self._started_called: + raise RuntimeError( + 'Context.started()` must be called before opening a stream' + ) + # NOTE: in one way streaming this only happens on the # caller side inside `Actor.send_cmd()` so if you try # to send a stop from the caller to the callee in the @@ -536,13 +543,29 @@ class Context: return self._result - async def started(self, value: Optional[Any] = None) -> None: + async def started( + self, + value: Optional[Any] = None + ) -> None: + ''' + Indicate to calling actor's task that this linked context + has started and send ``value`` to the other side. + + On the calling side ``value`` is the second item delivered + in the tuple returned by ``Portal.open_context()``. + + ''' if self._portal: raise RuntimeError( f"Caller side context {self} can not call started!") + elif self._started_called: + raise RuntimeError( + f"called 'started' twice on context with {self.chan.uid}") + await self.chan.send({'started': value, 'cid': self.cid}) + self._started_called = True # TODO: do we need a restart api? # async def restart(self) -> None: From 568902a5a94754d87df4bb25ac55066b3bf107e8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 28 Nov 2021 18:51:34 -0500 Subject: [PATCH 02/31] Add test for #265: "msg sent before stream opened" This always triggered the mentioned race condition. We need to figure out the best approach to avoid this case. --- tests/test_context_streams.py | 54 +++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 tests/test_context_streams.py diff --git a/tests/test_context_streams.py b/tests/test_context_streams.py new file mode 100644 index 0000000..3d00564 --- /dev/null +++ b/tests/test_context_streams.py @@ -0,0 +1,54 @@ +''' +``async with ():`` inlined context-stream cancellation testing. + +Verify the we raise errors when streams are opened prior to sync-opening +a ``tractor.Context`` beforehand. + +''' +import trio +from trio.lowlevel import current_task +import tractor + + +@tractor.context +async def never_open_stream( + + ctx: tractor.Context, + +) -> None: + '''Bidir streaming endpoint which will stream + back any sequence it is sent item-wise. + + ''' + await ctx.started() + await trio.sleep_forever() + + +def test_no_far_end_stream_opened(): + ''' + This should exemplify the bug from: + https://github.com/goodboy/tractor/issues/265 + + ''' + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'starts_no_stream', + enable_modules=[__name__], + ) + + async with ( + portal.open_context( + never_open_stream,) as (ctx, sent), + ctx.open_stream() as stream, + ): + assert sent is None + + # XXX: so the question is whether + # this should error if the far end + # has not yet called `ctx.open_stream()`? + # If we decide to do that we need a synchronization + # message which is sent from that call? + await stream.send('yo') + + trio.run(main) From 3f6099f161be1a8b4a5f8399052428508faad876 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Dec 2021 22:47:31 -0500 Subject: [PATCH 03/31] Add a double started error checking test --- tests/test_context_streams.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/test_context_streams.py b/tests/test_context_streams.py index 3d00564..b6e2a8c 100644 --- a/tests/test_context_streams.py +++ b/tests/test_context_streams.py @@ -5,11 +5,39 @@ Verify the we raise errors when streams are opened prior to sync-opening a ``tractor.Context`` beforehand. ''' +import pytest import trio from trio.lowlevel import current_task import tractor +@tractor.context +async def really_started( + ctx: tractor.Context, +) -> None: + await ctx.started() + try: + await ctx.started() + except RuntimeError as err: + raise + + +def test_started_called_more_then_once(): + + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'too_much_starteds', + enable_modules=[__name__], + ) + + async with portal.open_context(really_started) as (ctx, sent): + pass + + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + @tractor.context async def never_open_stream( From c5c3f7e78973e9d22fee73c241910d270a8b221c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Dec 2021 14:27:04 -0500 Subject: [PATCH 04/31] Use `tractor.Context` throughout the runtime core Instead of tracking feeder mem chans per RPC dialog, store `Context` instances which (now) hold refs to the underlying RPC-task feeder chans and track them inside a `Actor._contexts` map. This begins a transition to making the "context" idea the primitive abstraction for representing messaging dialogs between tasks in different memory domains (i.e. usually separate processes). A slew of changes made this possible: - change `Actor.get_memchans()` -> `.get_context()`. - Add new `Context._send_chan` and `._recv_chan` vars. - implicitly create a new context on every `Actor.send_cmd()` call. - use the context created by `.send_cmd()` in `Portal.open_context()` instead of manually creating one. - call `Actor.get_context()` inside tasks run from `._invoke()` such that feeder chans are implicitly created for callee tasks thus fixing the bug #265. NB: We might change some of the internal semantics to do with *when* the feeder chans are actually created to denote whether or not a far end task is actually *read to receive* messages. For example, in the cases where it **never** will be ready to receive messages (one-way streaming, a context that never opens a stream, etc.) we will likely want some kind of error or at least warning to the caller that messages can't be sent (yet). --- tractor/_actor.py | 90 ++++++++++++++++++++++--------------------- tractor/_portal.py | 38 ++++++++++-------- tractor/_streaming.py | 46 ++++++++++++++-------- 3 files changed, 99 insertions(+), 75 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index de86df1..3ea99ab 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -50,6 +50,7 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: dict[str, Any], + is_rpc: bool = True, task_status: TaskStatus[ Union[trio.CancelScope, BaseException] @@ -68,9 +69,10 @@ async def _invoke( tb = None cancel_scope = trio.CancelScope() + # activated cancel scope ref cs: Optional[trio.CancelScope] = None - ctx = Context(chan, cid) + ctx = actor.get_context(chan, cid) context: bool = False if getattr(func, '_tractor_stream_function', False): @@ -379,19 +381,19 @@ class Actor: self._no_more_peers.set() self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks.set() + # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: dict[ Tuple[Channel, str], Tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} - # map {uids -> {callids -> waiter queues}} - self._cids2qs: dict[ + + # map {actor uids -> Context} + self._contexts: dict[ Tuple[Tuple[str, str], str], - Tuple[ - trio.abc.SendChannel[Any], - trio.abc.ReceiveChannel[Any] - ] + Context ] = {} + self._listeners: List[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ @@ -567,17 +569,7 @@ class Actor: await local_nursery.exited.wait() - # channel cleanup sequence - - # for (channel, cid) in self._rpc_tasks.copy(): - # if channel is chan: - # with trio.CancelScope(shield=True): - # await self._cancel_task(cid, channel) - - # # close all consumer side task mem chans - # send_chan, _ = self._cids2qs[(chan.uid, cid)] - # assert send_chan.cid == cid # type: ignore - # await send_chan.aclose() + # ``Channel`` teardown and closure sequence # Drop ref to channel so it can be gc-ed and disconnected log.runtime(f"Releasing channel {chan} from {chan.uid}") @@ -621,19 +613,15 @@ class Actor: ) -> None: """Push an RPC result to the local consumer's queue. """ - # actorid = chan.uid assert chan.uid, f"`chan.uid` can't be {chan.uid}" - send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] - assert send_chan.cid == cid # type: ignore + ctx = self._contexts[(chan.uid, cid)] + send_chan = ctx._send_chan + # TODO: relaying far end context errors to the local + # context through nursery raising? # if 'error' in msg: - # ctx = getattr(recv_chan, '_ctx', None) - # if ctx: - # ctx._error_from_remote_msg(msg) - + # ctx._error_from_remote_msg(msg) # log.runtime(f"{send_chan} was terminated at remote end") - # # indicate to consumer that far end has stopped - # return await send_chan.aclose() try: log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") @@ -651,23 +639,35 @@ class Actor: # so cancel the far end streaming task log.warning(f"{send_chan} consumer is already closed") - def get_memchans( + def get_context( self, - actorid: Tuple[str, str], - cid: str + chan: Channel, + cid: str, + max_buffer_size: int = 2**6, - ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: + ) -> Context: + ''' + Look up or create a new inter-actor-task-IPC-linked task + "context" which encapsulates the local task's scheduling + enviroment including a ``trio`` cancel scope, a pair of IPC + messaging "feeder" channels, and an RPC id unique to the + task-as-function invocation. - log.runtime(f"Getting result queue for {actorid} cid {cid}") + ''' + log.runtime(f"Getting result queue for {chan.uid} cid {cid}") try: - send_chan, recv_chan = self._cids2qs[(actorid, cid)] + ctx = self._contexts[(chan.uid, cid)] except KeyError: - send_chan, recv_chan = trio.open_memory_channel(2**6) - send_chan.cid = cid # type: ignore - recv_chan.cid = cid # type: ignore - self._cids2qs[(actorid, cid)] = send_chan, recv_chan + send_chan, recv_chan = trio.open_memory_channel(max_buffer_size) + ctx = Context( + chan, + cid, + _send_chan=send_chan, + _recv_chan=recv_chan, + ) + self._contexts[(chan.uid, cid)] = ctx - return send_chan, recv_chan + return ctx async def send_cmd( self, @@ -675,19 +675,21 @@ class Actor: ns: str, func: str, kwargs: dict - ) -> Tuple[str, trio.abc.ReceiveChannel]: + + ) -> Tuple[str, trio.abc.MemoryReceiveChannel]: ''' - Send a ``'cmd'`` message to a remote actor and return a - caller id and a ``trio.Queue`` that can be used to wait for - responses delivered by the local message processing loop. + Send a ``'cmd'`` message to a remote actor and return a caller + id and a ``trio.MemoryReceiveChannel`` message "feeder" channel + that can be used to wait for responses delivered by the local + runtime's message processing loop. ''' cid = str(uuid.uuid4()) assert chan.uid - send_chan, recv_chan = self.get_memchans(chan.uid, cid) + ctx = self.get_context(chan, cid) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return cid, recv_chan + return ctx.cid, ctx._recv_chan async def _process_messages( self, diff --git a/tractor/_portal.py b/tractor/_portal.py index 70339fa..89bd3a9 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -109,16 +109,20 @@ class Portal: async def _submit( self, + ns: str, func: str, kwargs, + ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: - """Submit a function to be scheduled and run by actor, return the - associated caller id, response queue, response type str, - first message packet as a tuple. + ''' + Submit a function to be scheduled and run by actor, return the + associated caller id, response queue, response type str, first + message packet as a tuple. This is an async call. - """ + + ''' # ship a function call request to the remote actor cid, recv_chan = await self.actor.send_cmd( self.channel, ns, func, kwargs) @@ -244,7 +248,8 @@ class Portal: trio.BrokenResourceError, ): log.cancel( - f"{self.channel} for {self.channel.uid} was already closed or broken?") + f"{self.channel} for {self.channel.uid} was already " + "closed or broken?") return False async def run_from_ns( @@ -392,7 +397,8 @@ class Portal: **kwargs, ) -> AsyncGenerator[Tuple[Context, Any], None]: - '''Open an inter-actor task context. + ''' + Open an inter-actor task context. This is a synchronous API which allows for deterministic setup/teardown of a remote task. The yielded ``Context`` further @@ -433,19 +439,20 @@ class Portal: raise _err: Optional[BaseException] = None + + # this should already have been created from the + # ``._submit()`` call above. + ctx = self.actor.get_context(self.channel, cid) + # pairs with handling in ``Actor._push_result()`` + assert ctx._recv_chan is recv_chan + ctx._portal = self + # deliver context instance and .started() msg value in open tuple. try: async with trio.open_nursery() as scope_nursery: - ctx = Context( - self.channel, - cid, - _portal=self, - _recv_chan=recv_chan, - _scope_nursery=scope_nursery, - ) + ctx._scope_nursery = scope_nursery - # pairs with handling in ``Actor._push_result()`` - # recv_chan._ctx = ctx + # do we need this? # await trio.lowlevel.checkpoint() yield ctx, first @@ -496,6 +503,7 @@ class Portal: # we tear down the runtime feeder chan last # to avoid premature stream clobbers. if recv_chan is not None: + # should we encapsulate this in the context api? await recv_chan.aclose() if _err: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index af9ffaf..29be554 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -32,9 +32,10 @@ log = get_logger(__name__) class ReceiveMsgStream(trio.abc.ReceiveChannel): - '''A IPC message stream for receiving logically sequenced values - over an inter-actor ``Channel``. This is the type returned to - a local task which entered either ``Portal.open_stream_from()`` or + ''' + A IPC message stream for receiving logically sequenced values over + an inter-actor ``Channel``. This is the type returned to a local + task which entered either ``Portal.open_stream_from()`` or ``Context.open_stream()``. Termination rules: @@ -177,7 +178,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # In the bidirectional case, `Context.open_stream()` will create # the `Actor._cids2qs` entry from a call to - # `Actor.get_memchans()` and will send the stop message in + # `Actor.get_context()` and will send the stop message in # ``__aexit__()`` on teardown so it **does not** need to be # called here. if not self._ctx._portal: @@ -302,29 +303,38 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): @dataclass class Context: - '''An inter-actor task communication context. + ''' + An inter-actor, ``trio`` task communication context. Allows maintaining task or protocol specific state between 2 communicating actor tasks. A unique context is created on the callee side/end for every request to a remote actor from a portal. A context can be cancelled and (possibly eventually restarted) from - either side of the underlying IPC channel. - - A context can be used to open task oriented message streams and can - be thought of as an IPC aware inter-actor cancel scope. + either side of the underlying IPC channel, open task oriented + message streams and acts as an IPC aware inter-actor-task cancel + scope. ''' chan: Channel cid: str + # these are the "feeder" channels for delivering + # message values to the local task from the runtime + # msg processing loop. + _recv_chan: Optional[trio.MemoryReceiveChannel] = None + _send_chan: Optional[trio.MemorySendChannel] = None + # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa - _recv_chan: Optional[trio.MemoryReceiveChannel] = None _result: Optional[Any] = False + _remote_func_type: str = None + + # status flags _cancel_called: bool = False _started_called: bool = False _started_received: bool = False + _stream_opened: bool = False # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None @@ -424,7 +434,8 @@ class Context: self, ) -> AsyncGenerator[MsgStream, None]: - '''Open a ``MsgStream``, a bi-directional stream connected to the + ''' + Open a ``MsgStream``, a bi-directional stream connected to the cross-actor (far end) task for this ``Context``. This context manager must be entered on both the caller and @@ -467,29 +478,32 @@ class Context: # to send a stop from the caller to the callee in the # single-direction-stream case you'll get a lookup error # currently. - _, recv_chan = actor.get_memchans( - self.chan.uid, - self.cid + ctx = actor.get_context( + self.chan, + self.cid, ) + assert ctx is self # XXX: If the underlying channel feeder receive mem chan has # been closed then likely client code has already exited # a ``.open_stream()`` block prior or there was some other # unanticipated error or cancellation from ``trio``. - if recv_chan._closed: + if ctx._recv_chan._closed: raise trio.ClosedResourceError( 'The underlying channel for this stream was already closed!?') async with MsgStream( ctx=self, - rx_chan=recv_chan, + rx_chan=ctx._recv_chan, ) as rchan: if self._portal: self._portal._streams.add(rchan) try: + self._stream_opened = True + # ensure we aren't cancelled before delivering # the stream # await trio.lowlevel.checkpoint() From 872b24aeddd48a05991eef7a652a9b50340c6798 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Dec 2021 14:49:08 -0500 Subject: [PATCH 05/31] Prove we've fixed #265 --- tests/test_context_streams.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_context_streams.py b/tests/test_context_streams.py index b6e2a8c..e1d4de4 100644 --- a/tests/test_context_streams.py +++ b/tests/test_context_streams.py @@ -79,4 +79,9 @@ def test_no_far_end_stream_opened(): # message which is sent from that call? await stream.send('yo') + # without this we block waiting on the child side + await ctx.cancel() + + await portal.cancel_actor() + trio.run(main) From d307eab1187c9804aa1f837a5edd3476489e5005 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Dec 2021 16:51:15 -0500 Subject: [PATCH 06/31] Rework `Actor.send_cmd()` to `.start_remote_task()` This more formally declares the runtime's remote task startingn API and uses it throughout all the dependent `Portal` API methods. Allows dropping `Portal._submit()` and simplifying `.run_in_actor()` style result waiting to be delegated to the context APIs at remote task `return` response time. We now also track the remote entrypoint "type` as `Context._remote_func_type`. --- tractor/_actor.py | 38 ++++++++--- tractor/_portal.py | 143 +++++++++++++++++------------------------- tractor/_streaming.py | 9 +-- 3 files changed, 92 insertions(+), 98 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 3ea99ab..36f0092 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -616,6 +616,7 @@ class Actor: assert chan.uid, f"`chan.uid` can't be {chan.uid}" ctx = self._contexts[(chan.uid, cid)] send_chan = ctx._send_chan + assert send_chan # TODO: relaying far end context errors to the local # context through nursery raising? @@ -655,9 +656,13 @@ class Actor: ''' log.runtime(f"Getting result queue for {chan.uid} cid {cid}") + actor_uid = chan.uid + assert actor_uid try: - ctx = self._contexts[(chan.uid, cid)] + ctx = self._contexts[(actor_uid, cid)] except KeyError: + send_chan: trio.MemorySendChannel + recv_chan: trio.MemoryReceiveChannel send_chan, recv_chan = trio.open_memory_channel(max_buffer_size) ctx = Context( chan, @@ -665,23 +670,25 @@ class Actor: _send_chan=send_chan, _recv_chan=recv_chan, ) - self._contexts[(chan.uid, cid)] = ctx + self._contexts[(actor_uid, cid)] = ctx return ctx - async def send_cmd( + async def start_remote_task( self, chan: Channel, ns: str, func: str, kwargs: dict - ) -> Tuple[str, trio.abc.MemoryReceiveChannel]: + ) -> Context: ''' - Send a ``'cmd'`` message to a remote actor and return a caller - id and a ``trio.MemoryReceiveChannel`` message "feeder" channel - that can be used to wait for responses delivered by the local - runtime's message processing loop. + Send a ``'cmd'`` message to a remote actor, which starts + a remote task-as-function entrypoint. + + Synchronously validates the endpoint type and return a caller + side task ``Context`` that can be used to wait for responses + delivered by the local runtime's message processing loop. ''' cid = str(uuid.uuid4()) @@ -689,7 +696,20 @@ class Actor: ctx = self.get_context(chan, cid) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return ctx.cid, ctx._recv_chan + + # Wait on first response msg and validate; this should be + # immediate. + first_msg = await ctx._recv_chan.receive() + functype = first_msg.get('functype') + + if 'error' in first_msg: + raise unpack_error(first_msg, chan) + + elif functype not in ('asyncfunc', 'asyncgen', 'context'): + raise ValueError(f"{first_msg} is an invalid response packet?") + + ctx._remote_func_type = functype + return ctx async def _process_messages( self, diff --git a/tractor/_portal.py b/tractor/_portal.py index 89bd3a9..dcd2ed5 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -6,7 +6,7 @@ concurrency linked tasks running in disparate memory domains. import importlib import inspect from typing import ( - Tuple, Any, Dict, Optional, Set, + Any, Optional, Callable, AsyncGenerator ) from functools import partial @@ -49,7 +49,7 @@ async def maybe_open_nursery( yield nursery -def func_deats(func: Callable) -> Tuple[str, str]: +def func_deats(func: Callable) -> tuple[str, str]: return ( func.__module__, func.__name__, @@ -98,72 +98,45 @@ class Portal: # during the portal's lifetime self._result_msg: Optional[dict] = None - # When this is set to a tuple returned from ``_submit()`` then + # When set to a ``Context`` (when _submit_for_result is called) # it is expected that ``result()`` will be awaited at some - # point. Set when _submit_for_result is called - self._expect_result: Optional[ - Tuple[str, Any, str, Dict[str, Any]] - ] = None - self._streams: Set[ReceiveMsgStream] = set() + # point. + self._expect_result: Optional[Context] = None + self._streams: set[ReceiveMsgStream] = set() self.actor = current_actor() - async def _submit( + async def _submit_for_result( self, - ns: str, func: str, - kwargs, - - ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: - ''' - Submit a function to be scheduled and run by actor, return the - associated caller id, response queue, response type str, first - message packet as a tuple. - - This is an async call. - - ''' - # ship a function call request to the remote actor - cid, recv_chan = await self.actor.send_cmd( - self.channel, ns, func, kwargs) - - # wait on first response msg and handle (this should be - # in an immediate response) - - first_msg = await recv_chan.receive() - functype = first_msg.get('functype') - - if 'error' in first_msg: - raise unpack_error(first_msg, self.channel) - - elif functype not in ('asyncfunc', 'asyncgen', 'context'): - raise ValueError(f"{first_msg} is an invalid response packet?") - - return cid, recv_chan, functype, first_msg - - async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: + **kwargs + ) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" - self._expect_result = await self._submit(ns, func, kwargs) + self._expect_result = await self.actor.start_remote_task( + self.channel, + ns, + func, + kwargs + ) async def _return_once( self, - cid: str, - recv_chan: trio.abc.ReceiveChannel, - resptype: str, - first_msg: dict + ctx: Context, ) -> dict[str, Any]: - assert resptype == 'asyncfunc' # single response - msg = await recv_chan.receive() + assert ctx._remote_func_type == 'asyncfunc' # single response + msg = await ctx._recv_chan.receive() return msg async def result(self) -> Any: - """Return the result(s) from the remote actor's "main" task. - """ + ''' + Return the result(s) from the remote actor's "main" task. + + ''' # Check for non-rpc errors slapped on the # channel for which we always raise exc = self.channel._exc @@ -182,7 +155,9 @@ class Portal: assert self._expect_result if self._result_msg is None: - self._result_msg = await self._return_once(*self._expect_result) + self._result_msg = await self._return_once( + self._expect_result + ) return _unwrap_msg(self._result_msg, self.channel) @@ -275,7 +250,12 @@ class Portal: ''' msg = await self._return_once( - *(await self._submit(namespace_path, function_name, kwargs)) + await self.actor.start_remote_task( + self.channel, + namespace_path, + function_name, + kwargs, + ) ) return _unwrap_msg(msg, self.channel) @@ -320,7 +300,12 @@ class Portal: return _unwrap_msg( await self._return_once( - *(await self._submit(fn_mod_path, fn_name, kwargs)), + await self.actor.start_remote_task( + self.channel, + fn_mod_path, + fn_name, + kwargs, + ) ), self.channel, ) @@ -342,27 +327,21 @@ class Portal: f'{async_gen_func} must be an async generator function!') fn_mod_path, fn_name = func_deats(async_gen_func) - ( - cid, - recv_chan, - functype, - first_msg - ) = await self._submit(fn_mod_path, fn_name, kwargs) - - # receive only stream - assert functype == 'asyncgen' - - ctx = Context( + ctx = await self.actor.start_remote_task( self.channel, - cid, - # do we need this to be closed implicitly? - # _recv_chan=recv_chan, - _portal=self + fn_mod_path, + fn_name, + kwargs ) + ctx._portal = self + + # ensure receive-only stream entrypoint + assert ctx._remote_func_type == 'asyncgen' + try: # deliver receive only stream async with ReceiveMsgStream( - ctx, recv_chan, + ctx, ctx._recv_chan, ) as rchan: self._streams.add(rchan) yield rchan @@ -396,7 +375,7 @@ class Portal: func: Callable, **kwargs, - ) -> AsyncGenerator[Tuple[Context, Any], None]: + ) -> AsyncGenerator[tuple[Context, Any], None]: ''' Open an inter-actor task context. @@ -415,14 +394,14 @@ class Portal: f'{func} must be an async generator function!') fn_mod_path, fn_name = func_deats(func) - - recv_chan: Optional[trio.MemoryReceiveChannel] = None - - cid, recv_chan, functype, first_msg = await self._submit( - fn_mod_path, fn_name, kwargs) - - assert functype == 'context' - msg = await recv_chan.receive() + ctx = await self.actor.start_remote_task( + self.channel, + fn_mod_path, + fn_name, + kwargs + ) + assert ctx._remote_func_type == 'context' + msg = await ctx._recv_chan.receive() try: # the "first" value here is delivered by the callee's @@ -439,12 +418,6 @@ class Portal: raise _err: Optional[BaseException] = None - - # this should already have been created from the - # ``._submit()`` call above. - ctx = self.actor.get_context(self.channel, cid) - # pairs with handling in ``Actor._push_result()`` - assert ctx._recv_chan is recv_chan ctx._portal = self # deliver context instance and .started() msg value in open tuple. @@ -502,9 +475,9 @@ class Portal: # operating *in* this scope to have survived # we tear down the runtime feeder chan last # to avoid premature stream clobbers. - if recv_chan is not None: + if ctx._recv_chan is not None: # should we encapsulate this in the context api? - await recv_chan.aclose() + await ctx._recv_chan.aclose() if _err: if ctx._cancel_called: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 29be554..79cc956 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -322,13 +322,14 @@ class Context: # these are the "feeder" channels for delivering # message values to the local task from the runtime # msg processing loop. - _recv_chan: Optional[trio.MemoryReceiveChannel] = None - _send_chan: Optional[trio.MemorySendChannel] = None + _recv_chan: trio.MemoryReceiveChannel + _send_chan: trio.MemorySendChannel + + _remote_func_type: Optional[str] = None # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa _result: Optional[Any] = False - _remote_func_type: str = None # status flags _cancel_called: bool = False @@ -474,7 +475,7 @@ class Context: ) # NOTE: in one way streaming this only happens on the - # caller side inside `Actor.send_cmd()` so if you try + # caller side inside `Actor.start_remote_task()` so if you try # to send a stop from the caller to the callee in the # single-direction-stream case you'll get a lookup error # currently. From 6751349987ceea7a7a0b05f5edc55e81a5d42ffb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Dec 2021 18:28:02 -0500 Subject: [PATCH 07/31] Add a stream overrun exception --- tractor/_exceptions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 9c3edac..79bcd3c 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -61,6 +61,10 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" +class StreamOverrun(trio.TooSlowError): + "This stream was overrun by sender" + + def pack_error( exc: BaseException, tb=None, From 92b540d518ac11a6e0eacb27342737b78a85f01f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Dec 2021 19:19:53 -0500 Subject: [PATCH 08/31] Add internal msg stream backpressure controls In preparation for supporting both backpressure detection (through an optional error) as well as control over the msg channel buffer size, add internal configuration flags for both to contexts. Also adjust `Context._err_on_from_remote_msg()` -> `._maybe..` such that it can be called and will only raise if a scope nursery has been set. Add a `Context._error` for stashing the remote task's error that may be delivered in an `'error'` message. --- tractor/_streaming.py | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 79cc956..11a268a 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -190,7 +190,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # was it shouldn't matter since it's unlikely a user # will try to re-use a stream after attemping to close # it). - await self._ctx.send_stop() + with trio.CancelScope(shield=True): + await self._ctx.send_stop() except ( trio.BrokenResourceError, @@ -283,11 +284,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): class MsgStream(ReceiveMsgStream, trio.abc.Channel): - """ + ''' Bidirectional message stream for use within an inter-actor actor ``Context```. - """ + ''' async def send( self, data: Any @@ -297,6 +298,8 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' # if self._eoc: # raise trio.ClosedResourceError('This stream is already ded') + # if self._ctx._error: + # raise self._ctx._error await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -330,6 +333,7 @@ class Context: # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa _result: Optional[Any] = False + _error: Optional[BaseException] = None # status flags _cancel_called: bool = False @@ -340,6 +344,8 @@ class Context: # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None + _backpressure: bool = False + async def send_yield(self, data: Any) -> None: warnings.warn( @@ -353,23 +359,28 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) - def _error_from_remote_msg( + def _maybe_error_from_remote_msg( self, msg: Dict[str, Any], ) -> None: - '''Unpack and raise a msg error into the local scope + ''' + Unpack and raise a msg error into the local scope nursery for this context. Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. ''' - assert self._scope_nursery + self._error = unpack_error(msg, self.chan) - async def raiser(): - raise unpack_error(msg, self.chan) + if self._scope_nursery: - self._scope_nursery.start_soon(raiser) + async def raiser(): + __tracebackhide__ = True + raise self._error + + if not self._scope_nursery.cancel_scope.cancel_called: + self._scope_nursery.start_soon(raiser) async def cancel(self) -> None: '''Cancel this inter-actor-task context. @@ -433,6 +444,8 @@ class Context: async def open_stream( self, + backpressure: bool = False, + msg_buffer_size: Optional[int] = None, ) -> AsyncGenerator[MsgStream, None]: ''' @@ -482,7 +495,9 @@ class Context: ctx = actor.get_context( self.chan, self.cid, + msg_buffer_size=msg_buffer_size, ) + ctx._backpressure = backpressure assert ctx is self # XXX: If the underlying channel feeder receive mem chan has From 2680a9473dc25c9bad32ceb2537a0d2425e7ddbe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Dec 2021 19:28:00 -0500 Subject: [PATCH 09/31] Always set `Context._portal` on the caller task side --- tractor/_portal.py | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index dcd2ed5..82e5a5c 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -249,14 +249,14 @@ class Portal: internals. ''' - msg = await self._return_once( - await self.actor.start_remote_task( - self.channel, - namespace_path, - function_name, - kwargs, - ) + ctx = await self.actor.start_remote_task( + self.channel, + namespace_path, + function_name, + kwargs, ) + ctx._portal = self + msg = await self._return_once(ctx) return _unwrap_msg(msg, self.channel) async def run( @@ -298,15 +298,15 @@ class Portal: fn_mod_path, fn_name = func_deats(func) + ctx = await self.actor.start_remote_task( + self.channel, + fn_mod_path, + fn_name, + kwargs, + ) + ctx._portal = self return _unwrap_msg( - await self._return_once( - await self.actor.start_remote_task( - self.channel, - fn_mod_path, - fn_name, - kwargs, - ) - ), + await self._return_once(ctx), self.channel, ) @@ -385,7 +385,6 @@ class Portal: and synchronized final result collection. See ``tractor.Context``. ''' - # conduct target func method structural checks if not inspect.iscoroutinefunction(func) and ( getattr(func, '_tractor_contex_function', False) @@ -393,6 +392,8 @@ class Portal: raise TypeError( f'{func} must be an async generator function!') + __tracebackhide__ = True + fn_mod_path, fn_name = func_deats(func) ctx = await self.actor.start_remote_task( self.channel, @@ -407,6 +408,7 @@ class Portal: # the "first" value here is delivered by the callee's # ``Context.started()`` call. first = msg['started'] + ctx._started_called = True except KeyError: assert msg.get('cid'), ("Received internal error at context?") @@ -458,9 +460,10 @@ class Portal: _err = err # the context cancels itself on any cancel # causing error. - log.cancel(f'Context {ctx} sending cancel to far end') - with trio.CancelScope(shield=True): - await ctx.cancel() + log.cancel( + f'Context to {self.channel.uid} sending cancel request..') + + await ctx.cancel() raise finally: From 185dbc7e3ff7939f42610bd1fbfcd1c85f9dc394 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Dec 2021 19:31:41 -0500 Subject: [PATCH 10/31] Disable msg stream backpressure by default Half of portal API usage requires a 1 message response (`.run()`, `.run_in_actor()`) and the streaming APIs should probably be explicitly enabled for backpressure if desired by the user. This makes more sense in (psuedo) realtime systems where it's better to notify on a block then freeze without notice. Make this default behaviour with a new error to be raised: `tractor._exceptions.StreamOverrun` when a sender overruns a stream by the default size (2**6 for now). The old behavior can be enabled with `Context.open_stream(backpressure=True)` but now with warning log messages when there are overruns. Add task-linked-context error propagation using a "nursery raising" technique such that if either end of context linked pair of tasks errors, that error can be relayed to other side and raised as a form of interrupt at the receiving task's next `trio` checkpoint. This enables reliable error relay without expecting the (error) receiving task to call an API which would raise the remote exception (which it might never currently if using `tractor.MsgStream` APIs). Further internal implementation details: - define the default msg buffer size as `Actor.msg_buffer_size` - expose a `msg_buffer_size: int` kwarg from `Actor.get_context()` - maybe raise aforementioned context errors using `Context._maybe_error_from_remote_msg()` inside `Actor._push_result()` - support optional backpressure on a stream when pushing messages in `Actor._push_result()` - in `_invote()` handle multierrors raised from a `@tractor.context` entrypoint as being potentially caused by a relayed error from the remote caller task, if `Context._error` has been set then raise that error inside the `RemoteActorError` that will be relayed back to that caller more or less proxying through the source side error back to its origin. --- tractor/_actor.py | 99 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 76 insertions(+), 23 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 36f0092..b78b872 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -32,6 +32,7 @@ from ._exceptions import ( is_multi_cancelled, ContextCancelled, TransportClosed, + StreamOverrun, ) from . import _debug from ._discovery import get_arbiter @@ -161,16 +162,27 @@ async def _invoke( # context func with support for bi-dir streaming await chan.send({'functype': 'context', 'cid': cid}) - async with trio.open_nursery() as scope_nursery: - ctx._scope_nursery = scope_nursery - cs = scope_nursery.cancel_scope - task_status.started(cs) - try: + try: + async with trio.open_nursery() as scope_nursery: + ctx._scope_nursery = scope_nursery + cs = scope_nursery.cancel_scope + task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - except trio.Cancelled as err: - tb = err.__traceback__ - if cs.cancelled_caught: + except trio.Cancelled as err: + tb = err.__traceback__ + if ctx._error is not None: + tb = ctx._error.__traceback__ + raise ctx._error + + except trio.MultiError as err: + if ctx._error is not None: + tb = ctx._error.__traceback__ + raise ctx._error from err + else: + raise + + if cs.cancelled_caught or ctx._error: # TODO: pack in ``trio.Cancelled.__traceback__`` here # so they can be unwrapped and displayed on the caller @@ -314,6 +326,7 @@ class Actor: # ugh, we need to get rid of this and replace with a "registry" sys # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False + msg_buffer_size: int = 2**6 # nursery placeholders filled in by `_async_main()` after fork _root_n: Optional[trio.Nursery] = None @@ -548,7 +561,7 @@ class Actor: # now in a cancelled condition) when the local runtime here # is now cancelled while (presumably) in the middle of msg # loop processing. - with trio.move_on_after(0.1) as cs: + with trio.move_on_after(0.5) as cs: cs.shield = True # Attempt to wait for the far end to close the channel # and bail after timeout (2-generals on closure). @@ -611,23 +624,54 @@ class Actor: cid: str, msg: dict[str, Any], ) -> None: - """Push an RPC result to the local consumer's queue. - """ + ''' + Push an RPC result to the local consumer's queue. + + ''' assert chan.uid, f"`chan.uid` can't be {chan.uid}" ctx = self._contexts[(chan.uid, cid)] send_chan = ctx._send_chan assert send_chan - # TODO: relaying far end context errors to the local - # context through nursery raising? - # if 'error' in msg: - # ctx._error_from_remote_msg(msg) - # log.runtime(f"{send_chan} was terminated at remote end") + if msg.get('error'): + # If this is an error message from a context opened by + # ``Portal.open_context()`` we want to interrupt any ongoing + # (child) tasks within that context to be notified of the remote + # error relayed here. + # + # The reason we may want to raise the remote error immediately + # is that there is no guarantee the associated local task(s) + # will attempt to read from any locally opened stream any time + # soon. + # + # NOTE: this only applies when + # ``Portal.open_context()`` has been called since it is assumed + # (currently) that other portal APIs (``Portal.run()``, + # ``.run_in_actor()``) do their own error checking at the point + # of the call and result processing. + log.warning(f'Remote context for {chan.uid}:{cid} errored') + ctx._maybe_error_from_remote_msg(msg) try: log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") - # maintain backpressure - await send_chan.send(msg) + + # XXX: we do **not** maintain backpressure and instead + # opt to relay stream overrun errors to the sender. + try: + send_chan.send_nowait(msg) + except trio.WouldBlock: + log.warning(f'Caller task {cid} was overrun!?') + if ctx._backpressure: + await send_chan.send(msg) + else: + try: + raise StreamOverrun( + f'Context stream {cid} for {chan.uid} was overrun!' + ) + except StreamOverrun as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + await chan.send(err_msg) except trio.BrokenResourceError: # TODO: what is the right way to handle the case where the @@ -644,7 +688,7 @@ class Actor: self, chan: Channel, cid: str, - max_buffer_size: int = 2**6, + msg_buffer_size: Optional[int] = None, ) -> Context: ''' @@ -660,10 +704,17 @@ class Actor: assert actor_uid try: ctx = self._contexts[(actor_uid, cid)] + + # adjust buffer size if specified + state = ctx._send_chan._state + if msg_buffer_size and state.max_buffer_size != msg_buffer_size: + state.max_buffer_size = msg_buffer_size + except KeyError: send_chan: trio.MemorySendChannel recv_chan: trio.MemoryReceiveChannel - send_chan, recv_chan = trio.open_memory_channel(max_buffer_size) + send_chan, recv_chan = trio.open_memory_channel( + msg_buffer_size or self.msg_buffer_size) ctx = Context( chan, cid, @@ -679,7 +730,8 @@ class Actor: chan: Channel, ns: str, func: str, - kwargs: dict + kwargs: dict, + msg_buffer_size: Optional[int] = None, ) -> Context: ''' @@ -693,7 +745,7 @@ class Actor: ''' cid = str(uuid.uuid4()) assert chan.uid - ctx = self.get_context(chan, cid) + ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) @@ -743,7 +795,8 @@ class Actor: if msg is None: # loop terminate sentinel log.cancel( - f"Cancelling all tasks for {chan} from {chan.uid}") + f"Channerl to {chan.uid} terminated?\n" + "Cancelling all associated tasks..") for (channel, cid) in self._rpc_tasks.copy(): if channel is chan: From 2b05ffcc23968f614b492d5f9caf7c2caf85bf7d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Dec 2021 19:50:39 -0500 Subject: [PATCH 11/31] Add context stream overrun tests --- tests/test_context_streams.py | 84 +++++++++++++++++++++++++++-------- 1 file changed, 65 insertions(+), 19 deletions(-) diff --git a/tests/test_context_streams.py b/tests/test_context_streams.py index e1d4de4..48ccbc8 100644 --- a/tests/test_context_streams.py +++ b/tests/test_context_streams.py @@ -5,9 +5,10 @@ Verify the we raise errors when streams are opened prior to sync-opening a ``tractor.Context`` beforehand. ''' +from itertools import count + import pytest import trio -from trio.lowlevel import current_task import tractor @@ -18,7 +19,7 @@ async def really_started( await ctx.started() try: await ctx.started() - except RuntimeError as err: + except RuntimeError: raise @@ -32,9 +33,10 @@ def test_started_called_more_then_once(): ) async with portal.open_context(really_started) as (ctx, sent): - pass + await trio.sleep(1) + # pass - with pytest.raises(tractor.RemoteActorError) as excinfo: + with pytest.raises(tractor.RemoteActorError): trio.run(main) @@ -44,20 +46,50 @@ async def never_open_stream( ctx: tractor.Context, ) -> None: - '''Bidir streaming endpoint which will stream - back any sequence it is sent item-wise. + ''' + Context which never opens a stream and blocks. ''' await ctx.started() await trio.sleep_forever() -def test_no_far_end_stream_opened(): +@tractor.context +async def keep_sending_from_callee( + + ctx: tractor.Context, + +) -> None: + ''' + Send endlessly on the calleee stream. + + ''' + await ctx.started() + async with ctx.open_stream() as stream: + for msg in count(): + await stream.send(msg) + await trio.sleep(0.01) + + +@pytest.mark.parametrize( + 'overrun_by', + [ + (None, 0, never_open_stream), # use default settings + ('caller', 1, never_open_stream), + ('callee', 0, keep_sending_from_callee), + ], + ids='overrun_condition_by={}'.format, +) +def test_one_end_stream_not_opened(overrun_by): ''' This should exemplify the bug from: https://github.com/goodboy/tractor/issues/265 ''' + overrunner, buf_size_increase, entrypoint = overrun_by + from tractor._actor import Actor + buf_size = buf_size_increase + Actor.msg_buffer_size + async def main(): async with tractor.open_nursery() as n: portal = await n.start_actor( @@ -65,23 +97,37 @@ def test_no_far_end_stream_opened(): enable_modules=[__name__], ) - async with ( - portal.open_context( - never_open_stream,) as (ctx, sent), - ctx.open_stream() as stream, - ): + async with portal.open_context(entrypoint) as (ctx, sent): assert sent is None - # XXX: so the question is whether - # this should error if the far end - # has not yet called `ctx.open_stream()`? - # If we decide to do that we need a synchronization - # message which is sent from that call? - await stream.send('yo') + if overrunner in (None, 'caller'): + + async with ctx.open_stream() as stream: + for i in range(buf_size - 1): + await stream.send('yo') + + else: + # callee overruns caller case so we do nothing here + await trio.sleep_forever() # without this we block waiting on the child side await ctx.cancel() await portal.cancel_actor() - trio.run(main) + # 2 overrun cases and the no overrun case (which pushes right up to + # the msg limit) + if overrunner == 'caller': + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + assert excinfo.value.type == tractor._exceptions.StreamOverrun + + elif overrunner == 'callee': + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + assert excinfo.value.type == tractor.RemoteActorError + + else: + trio.run(main) From 7b9d410c4db63a7d739a9a024f661165c8f4f912 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Dec 2021 19:52:09 -0500 Subject: [PATCH 12/31] Adjust remaining examples and tests for non-backpressure default --- examples/full_fledged_streaming_service.py | 2 +- tests/test_2way.py | 13 ++++++++----- tests/test_streaming.py | 2 +- tests/test_task_broadcasting.py | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index 31eff62..1650b58 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -7,7 +7,7 @@ import tractor async def stream_data(seed): for i in range(seed): yield i - await trio.sleep(0) # trigger scheduler + await trio.sleep(0.0001) # trigger scheduler # this is the third actor; the aggregator diff --git a/tests/test_2way.py b/tests/test_2way.py index c038ae4..410c299 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -386,7 +386,8 @@ async def cancel_self( @tractor_test async def test_callee_cancels_before_started(): - '''callee calls `Context.cancel()` while streaming and caller + ''' + Callee calls `Context.cancel()` while streaming and caller sees stream terminated in `ContextCancelled`. ''' @@ -420,9 +421,10 @@ async def simple_rpc( data: int, ) -> None: - """Test a small ping-pong server. + ''' + Test a small ping-pong server. - """ + ''' # signal to parent that we're up await ctx.started(data + 1) @@ -480,9 +482,10 @@ async def simple_rpc_with_forloop( [simple_rpc, simple_rpc_with_forloop], ) def test_simple_rpc(server_func, use_async_for): - """The simplest request response pattern. + ''' + The simplest request response pattern. - """ + ''' async def main(): async with tractor.open_nursery() as n: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 38fbee4..baee54e 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -132,7 +132,7 @@ async def stream_data(seed): yield i # trigger scheduler to simulate practical usage - await trio.sleep(0) + await trio.sleep(0.0001) # this is the third actor; the aggregator diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index b18a40e..4c3d1ff 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -83,7 +83,7 @@ async def open_sequence_streamer( ) as (ctx, first): assert first is None - async with ctx.open_stream() as stream: + async with ctx.open_stream(backpressure=True) as stream: yield stream await portal.cancel_actor() From 41a3e6a9ca0bd04d46f397f2d69f15c82f15c311 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Dec 2021 20:00:40 -0500 Subject: [PATCH 13/31] Type check fixes --- tractor/_actor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index b78b872..1f7d9f5 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -182,6 +182,7 @@ async def _invoke( else: raise + assert cs if cs.cancelled_caught or ctx._error: # TODO: pack in ``trio.Cancelled.__traceback__`` here @@ -706,7 +707,7 @@ class Actor: ctx = self._contexts[(actor_uid, cid)] # adjust buffer size if specified - state = ctx._send_chan._state + state = ctx._send_chan._state # type: ignore if msg_buffer_size and state.max_buffer_size != msg_buffer_size: state.max_buffer_size = msg_buffer_size From f3432bd8fbbc31725bc4ccb6ad1887fb08880173 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Dec 2021 20:02:55 -0500 Subject: [PATCH 14/31] Enable bp on clustering test --- tests/test_clustering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_clustering.py b/tests/test_clustering.py index ba8052f..56e629b 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -14,7 +14,7 @@ MESSAGE = 'tractoring at full speed' @tractor.context async def worker(ctx: tractor.Context) -> None: await ctx.started() - async with ctx.open_stream() as stream: + async with ctx.open_stream(backpressure=True) as stream: async for msg in stream: # do something with msg print(msg) From 4ea5c9b5db782d931fac67b56ef223c4a9fb2cde Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 10:52:18 -0500 Subject: [PATCH 15/31] Pop context on `.open_context()` exit --- tractor/_portal.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tractor/_portal.py b/tractor/_portal.py index 82e5a5c..bdfef3e 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -395,12 +395,14 @@ class Portal: __tracebackhide__ = True fn_mod_path, fn_name = func_deats(func) + ctx = await self.actor.start_remote_task( self.channel, fn_mod_path, fn_name, kwargs ) + assert ctx._remote_func_type == 'context' msg = await ctx._recv_chan.receive() @@ -497,6 +499,9 @@ class Portal: f'value from callee `{result}`' ) + # remove the context from runtime tracking + self.actor._contexts.pop((self.channel.uid, ctx.cid)) + @dataclass class LocalPortal: From b826ec81032d55565e5539230e0b687a270128c0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 10:57:58 -0500 Subject: [PATCH 16/31] Better idea, enable backpressure on opened streams Keeping it disabled on context open will help with detecting any stream connection which was never opened on one side of the task pair. In that case we can report that there was an overrun **and** a stream wasn't opened versus if the stream is explicitly configured not to use bp then we throw the standard overflow. Use `trio.Nursery._closed` to detect "closure" XD since it seems to be the most reliable way to determine if a spawn call will trigger a runtime error. --- tractor/_streaming.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 11a268a..6c57065 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -298,8 +298,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' # if self._eoc: # raise trio.ClosedResourceError('This stream is already ded') - # if self._ctx._error: - # raise self._ctx._error + + if self._ctx._error: + raise self._ctx._error # from None await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -309,6 +310,10 @@ class Context: ''' An inter-actor, ``trio`` task communication context. + NB: This class should never be instatiated directly, it is delivered + by either runtime machinery to a remotely started task or by entering + ``Portal.open_context()``. + Allows maintaining task or protocol specific state between 2 communicating actor tasks. A unique context is created on the callee side/end for every request to a remote actor from a portal. @@ -370,20 +375,26 @@ class Context: Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. + ''' self._error = unpack_error(msg, self.chan) + # TODO: tempted to **not** do this by-reraising in a + # nursery and instead cancel a surrounding scope, detect + # the cancellation, then lookup the error that was set? if self._scope_nursery: async def raiser(): - __tracebackhide__ = True - raise self._error + raise self._error from None - if not self._scope_nursery.cancel_scope.cancel_called: + # from trio.testing import wait_all_tasks_blocked + # await wait_all_tasks_blocked() + if not self._scope_nursery._closed: # type: ignore self._scope_nursery.start_soon(raiser) async def cancel(self) -> None: - '''Cancel this inter-actor-task context. + ''' + Cancel this inter-actor-task context. Request that the far side cancel it's current linked context, Timeout quickly in an attempt to sidestep 2-generals... @@ -444,7 +455,7 @@ class Context: async def open_stream( self, - backpressure: bool = False, + backpressure: Optional[bool] = True, msg_buffer_size: Optional[int] = None, ) -> AsyncGenerator[MsgStream, None]: @@ -555,7 +566,7 @@ class Context: try: self._result = msg['return'] break - except KeyError: + except KeyError as msgerr: if 'yield' in msg: # far end task is still streaming to us so discard @@ -569,7 +580,10 @@ class Context: # internal error should never get here assert msg.get('cid'), ( "Received internal error at portal?") - raise unpack_error(msg, self._portal.channel) + + raise unpack_error( + msg, self._portal.channel + ) from msgerr return self._result From 318027ebd1702c0c6de1278a41d451f504149238 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 11:38:47 -0500 Subject: [PATCH 17/31] Raise stream overruns on one side never opened A context stream overrun should normally never take place since if a stream is opened (via ``Context.open_stream()``) backpressure is applied on the message buffer (unless explicitly disabled by the ``backpressure=False`` flag) such that an overrun on the receiving task should result in blocking the (remote) sender task (eventually depending on the underlying ``MsgStream`` transport). Here we add a special error message that reports if one side never opened a stream and let's the user know in the overrun error message that they may be trying to push messages to a task that isn't ready to receive them. Further fixes / details: - pop any `Context` at the end of any `_invoke()` task that creates one and registers with the runtime. - ignore but warn about messages received for a context that either no longer exists or is unknown (guarding against crashes by malicious packets in the latter case) --- tractor/_actor.py | 71 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 1f7d9f5..3a843e1 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -171,19 +171,17 @@ async def _invoke( except trio.Cancelled as err: tb = err.__traceback__ - if ctx._error is not None: - tb = ctx._error.__traceback__ - raise ctx._error - except trio.MultiError as err: + except trio.MultiError: + # if a context error was set then likely + # thei multierror was raised due to that if ctx._error is not None: - tb = ctx._error.__traceback__ - raise ctx._error from err - else: - raise + raise ctx._error from None + + raise assert cs - if cs.cancelled_caught or ctx._error: + if cs.cancelled_caught: # TODO: pack in ``trio.Cancelled.__traceback__`` here # so they can be unwrapped and displayed on the caller @@ -257,6 +255,11 @@ async def _invoke( task_status.started(err) finally: + assert chan.uid + ctx = actor._contexts.pop((chan.uid, cid)) + if ctx: + log.cancel(f'{ctx} was terminated') + # RPC task bookeeping try: scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) @@ -594,6 +597,10 @@ class Actor: log.runtime(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) + # for (uid, cid) in self._contexts.copy(): + # if chan.uid == uid: + # self._contexts.pop((uid, cid)) + log.runtime(f"Peers is {self._peers}") if not self._peers: # no more channels connected @@ -629,12 +636,20 @@ class Actor: Push an RPC result to the local consumer's queue. ''' - assert chan.uid, f"`chan.uid` can't be {chan.uid}" - ctx = self._contexts[(chan.uid, cid)] + uid = chan.uid + assert uid, f"`chan.uid` can't be {uid}" + try: + ctx = self._contexts[(uid, cid)] + except KeyError: + log.warning( + f'Ignoring {msg} for unknwon context with {uid}') + return + send_chan = ctx._send_chan assert send_chan - if msg.get('error'): + error = msg.get('error') + if error: # If this is an error message from a context opened by # ``Portal.open_context()`` we want to interrupt any ongoing # (child) tasks within that context to be notified of the remote @@ -650,7 +665,7 @@ class Actor: # (currently) that other portal APIs (``Portal.run()``, # ``.run_in_actor()``) do their own error checking at the point # of the call and result processing. - log.warning(f'Remote context for {chan.uid}:{cid} errored') + log.warning(f'Remote context for {chan.uid}:{cid} errored {msg}') ctx._maybe_error_from_remote_msg(msg) try: @@ -661,14 +676,36 @@ class Actor: try: send_chan.send_nowait(msg) except trio.WouldBlock: - log.warning(f'Caller task {cid} was overrun!?') + + # XXX: do we need this? + # if we're trying to push an error but we're in + # an overrun state we'll just get stuck sending + # the error that was sent to us back to it's sender + # instead of it actually being raises in the target + # task.. + # if error: + # raise unpack_error(msg, chan) from None + + uid = chan.uid + + lines = [ + 'Task context stream was overrun', + f'local task: {cid} @ {self.uid}', + f'remote sender: {chan.uid}', + ] + if not ctx._stream_opened: + lines.insert( + 1, + f'\n*** No stream open on {self.uid[0]} side! ***\n' + ) + + text = '\n'.join(lines) + log.warning(text) if ctx._backpressure: await send_chan.send(msg) else: try: - raise StreamOverrun( - f'Context stream {cid} for {chan.uid} was overrun!' - ) + raise StreamOverrun(text) from None except StreamOverrun as err: err_msg = pack_error(err) err_msg['cid'] = cid From 142083d81b2497b8c64a9d7d286619d7e94ebaa1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 11:53:15 -0500 Subject: [PATCH 18/31] Don't cancel the context on overrun cases --- tests/test_context_streams.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/test_context_streams.py b/tests/test_context_streams.py index 48ccbc8..3117a45 100644 --- a/tests/test_context_streams.py +++ b/tests/test_context_streams.py @@ -97,22 +97,28 @@ def test_one_end_stream_not_opened(overrun_by): enable_modules=[__name__], ) - async with portal.open_context(entrypoint) as (ctx, sent): + async with portal.open_context( + entrypoint, + ) as (ctx, sent): assert sent is None if overrunner in (None, 'caller'): async with ctx.open_stream() as stream: for i in range(buf_size - 1): + await stream.send(i) + + if overrunner is None: + # without this we block waiting on the child side + await ctx.cancel() + + else: await stream.send('yo') else: # callee overruns caller case so we do nothing here await trio.sleep_forever() - # without this we block waiting on the child side - await ctx.cancel() - await portal.cancel_actor() # 2 overrun cases and the no overrun case (which pushes right up to From 58805a043089a0c33c2f4bb39d6346ada7c24096 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 12:17:26 -0500 Subject: [PATCH 19/31] Slight delay to avoid flaky bcast race --- tests/test_task_broadcasting.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 4c3d1ff..9b4258e 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -334,7 +334,7 @@ def test_ensure_slow_consumers_lag_out( if task.name == 'sub_1': # trigger checkpoint to clean out other subs - await trio.sleep(0) + await trio.sleep(0.01) # the non-lagger got # a ``trio.EndOfChannel`` @@ -401,7 +401,7 @@ def test_ensure_slow_consumers_lag_out( assert not tx._state.open_send_channels # check that "first" bcaster that we created - # above, never wass iterated and is thus overrun + # above, never was iterated and is thus overrun try: await brx.receive() except Lagged: From 1f8e1cccbb191053a2ed5f3251f4ee5b935b2ae6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 13:47:49 -0500 Subject: [PATCH 20/31] Only pop contexts on decorated entrypoints --- tractor/_actor.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 3a843e1..66b32c9 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -180,6 +180,14 @@ async def _invoke( raise + finally: + # XXX: only pop the context tracking if + # a ``@tractor.context`` entrypoint was called + assert chan.uid + ctx = actor._contexts.pop((chan.uid, cid)) + if ctx: + log.runtime(f'Context entrypoint for {func} was terminated:\n{ctx}') + assert cs if cs.cancelled_caught: @@ -255,15 +263,11 @@ async def _invoke( task_status.started(err) finally: - assert chan.uid - ctx = actor._contexts.pop((chan.uid, cid)) - if ctx: - log.cancel(f'{ctx} was terminated') - # RPC task bookeeping try: scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) is_complete.set() + except KeyError: if is_rpc: # If we're cancelled before the task returns then the @@ -696,7 +700,7 @@ class Actor: if not ctx._stream_opened: lines.insert( 1, - f'\n*** No stream open on {self.uid[0]} side! ***\n' + f'\n*** No stream open on `{self.uid[0]}` side! ***\n' ) text = '\n'.join(lines) From c9132de7dca3fb8a574529124c74ddb4b014b8e3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 15:52:23 -0500 Subject: [PATCH 21/31] Move maybe-raise-error-msg logic into context A context method handling all this logic makes the most sense since it contains all the state related to whether the error should be raised in a nursery scope or is expected to be raised by a consumer task which reads and processes the msg directly (via a `Portal` API call). This also makes it easy to always process remote errors even when there is no (stream) overrun condition. --- tractor/_actor.py | 107 ++++++++++++++++-------------------------- tractor/_streaming.py | 48 ++++++++++++++----- 2 files changed, 76 insertions(+), 79 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 66b32c9..df754c3 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -650,81 +650,56 @@ class Actor: return send_chan = ctx._send_chan - assert send_chan - error = msg.get('error') - if error: - # If this is an error message from a context opened by - # ``Portal.open_context()`` we want to interrupt any ongoing - # (child) tasks within that context to be notified of the remote - # error relayed here. - # - # The reason we may want to raise the remote error immediately - # is that there is no guarantee the associated local task(s) - # will attempt to read from any locally opened stream any time - # soon. - # - # NOTE: this only applies when - # ``Portal.open_context()`` has been called since it is assumed - # (currently) that other portal APIs (``Portal.run()``, - # ``.run_in_actor()``) do their own error checking at the point - # of the call and result processing. - log.warning(f'Remote context for {chan.uid}:{cid} errored {msg}') - ctx._maybe_error_from_remote_msg(msg) + log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") + # XXX: we do **not** maintain backpressure and instead + # opt to relay stream overrun errors to the sender. try: - log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") + send_chan.send_nowait(msg) + # if an error is deteced we should always + # expect it to be raised by any context (stream) + # consumer task + await ctx._maybe_raise_from_remote_msg(msg) - # XXX: we do **not** maintain backpressure and instead - # opt to relay stream overrun errors to the sender. - try: - send_chan.send_nowait(msg) - except trio.WouldBlock: + except trio.WouldBlock: + # XXX: always push an error even if the local + # receiver is in overrun state. + await ctx._maybe_raise_from_remote_msg(msg) - # XXX: do we need this? - # if we're trying to push an error but we're in - # an overrun state we'll just get stuck sending - # the error that was sent to us back to it's sender - # instead of it actually being raises in the target - # task.. - # if error: - # raise unpack_error(msg, chan) from None + uid = chan.uid + lines = [ + 'Task context stream was overrun', + f'local task: {cid} @ {self.uid}', + f'remote sender: {uid}', + ] + if not ctx._stream_opened: + lines.insert( + 1, + f'\n*** No stream open on `{self.uid[0]}` side! ***\n' + ) + text = '\n'.join(lines) - uid = chan.uid - - lines = [ - 'Task context stream was overrun', - f'local task: {cid} @ {self.uid}', - f'remote sender: {chan.uid}', - ] - if not ctx._stream_opened: - lines.insert( - 1, - f'\n*** No stream open on `{self.uid[0]}` side! ***\n' - ) - - text = '\n'.join(lines) + if ctx._backpressure: log.warning(text) - if ctx._backpressure: - await send_chan.send(msg) - else: + await send_chan.send(msg) + else: + try: + raise StreamOverrun(text) from None + except StreamOverrun as err: + err_msg = pack_error(err) + err_msg['cid'] = cid try: - raise StreamOverrun(text) from None - except StreamOverrun as err: - err_msg = pack_error(err) - err_msg['cid'] = cid + # TODO: what is the right way to handle the case where the + # local task has already sent a 'stop' / StopAsyncInteration + # to the other side but and possibly has closed the local + # feeder mem chan? Do we wait for some kind of ack or just + # let this fail silently and bubble up (currently)? await chan.send(err_msg) - - except trio.BrokenResourceError: - # TODO: what is the right way to handle the case where the - # local task has already sent a 'stop' / StopAsyncInteration - # to the other side but and possibly has closed the local - # feeder mem chan? Do we wait for some kind of ack or just - # let this fail silently and bubble up (currently)? - - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{send_chan} consumer is already closed") + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") def get_context( self, diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 6c57065..05a3073 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -364,33 +364,55 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) - def _maybe_error_from_remote_msg( + async def _maybe_raise_from_remote_msg( self, msg: Dict[str, Any], ) -> None: ''' - Unpack and raise a msg error into the local scope + (Maybe) unpack and raise a msg error into the local scope nursery for this context. Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. ''' - self._error = unpack_error(msg, self.chan) + error = msg.get('error') + if error: + # If this is an error message from a context opened by + # ``Portal.open_context()`` we want to interrupt any ongoing + # (child) tasks within that context to be notified of the remote + # error relayed here. + # + # The reason we may want to raise the remote error immediately + # is that there is no guarantee the associated local task(s) + # will attempt to read from any locally opened stream any time + # soon. + # + # NOTE: this only applies when + # ``Portal.open_context()`` has been called since it is assumed + # (currently) that other portal APIs (``Portal.run()``, + # ``.run_in_actor()``) do their own error checking at the point + # of the call and result processing. + log.error( + f'Remote context error for {self.chan.uid}:{self.cid}:\n' + f'{msg["error"]["tb_str"]}' + ) + # await ctx._maybe_error_from_remote_msg(msg) + self._error = unpack_error(msg, self.chan) - # TODO: tempted to **not** do this by-reraising in a - # nursery and instead cancel a surrounding scope, detect - # the cancellation, then lookup the error that was set? - if self._scope_nursery: + # TODO: tempted to **not** do this by-reraising in a + # nursery and instead cancel a surrounding scope, detect + # the cancellation, then lookup the error that was set? + if self._scope_nursery: - async def raiser(): - raise self._error from None + async def raiser(): + raise self._error from None - # from trio.testing import wait_all_tasks_blocked - # await wait_all_tasks_blocked() - if not self._scope_nursery._closed: # type: ignore - self._scope_nursery.start_soon(raiser) + # from trio.testing import wait_all_tasks_blocked + # await wait_all_tasks_blocked() + if not self._scope_nursery._closed: # type: ignore + self._scope_nursery.start_soon(raiser) async def cancel(self) -> None: ''' From a79cdc7b44f444e012dea4da24ffc1398270be7a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 16:00:21 -0500 Subject: [PATCH 22/31] Make cancel case expect multi-error --- tests/test_context_streams.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/tests/test_context_streams.py b/tests/test_context_streams.py index 3117a45..9c45231 100644 --- a/tests/test_context_streams.py +++ b/tests/test_context_streams.py @@ -10,6 +10,7 @@ from itertools import count import pytest import trio import tractor +from tractor._exceptions import StreamOverrun @tractor.context @@ -74,11 +75,11 @@ async def keep_sending_from_callee( @pytest.mark.parametrize( 'overrun_by', [ - (None, 0, never_open_stream), # use default settings ('caller', 1, never_open_stream), + ('cancel_caller_during_overrun', 1, never_open_stream), ('callee', 0, keep_sending_from_callee), ], - ids='overrun_condition_by={}'.format, + ids='overrun_condition={}'.format, ) def test_one_end_stream_not_opened(overrun_by): ''' @@ -102,18 +103,21 @@ def test_one_end_stream_not_opened(overrun_by): ) as (ctx, sent): assert sent is None - if overrunner in (None, 'caller'): + if 'caller' in overrunner: async with ctx.open_stream() as stream: - for i in range(buf_size - 1): + for i in range(buf_size): + print(f'sending {i}') await stream.send(i) - if overrunner is None: + if 'cancel' in overrunner: # without this we block waiting on the child side await ctx.cancel() else: - await stream.send('yo') + # expect overrun error to be relayed back + # and this sleep interrupted + await trio.sleep_forever() else: # callee overruns caller case so we do nothing here @@ -127,12 +131,29 @@ def test_one_end_stream_not_opened(overrun_by): with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) - assert excinfo.value.type == tractor._exceptions.StreamOverrun + assert excinfo.value.type == StreamOverrun + + elif 'cancel' in overrunner: + with pytest.raises(trio.MultiError) as excinfo: + trio.run(main) + + multierr = excinfo.value + + for exc in multierr.exceptions: + etype = type(exc) + if etype == tractor.RemoteActorError: + assert exc.type == StreamOverrun + else: + assert etype == tractor.ContextCancelled elif overrunner == 'callee': with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) + # TODO: embedded remote errors so that we can verify the source + # error? + # the callee delivers an error which is an overrun wrapped + # in a remote actor error. assert excinfo.value.type == tractor.RemoteActorError else: From 4b40599c48956dec27785fc5888e62a7b8354bc8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 16:29:28 -0500 Subject: [PATCH 23/31] Fix ignore warning log message --- tractor/_actor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index df754c3..742d9ae 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -646,7 +646,8 @@ class Actor: ctx = self._contexts[(uid, cid)] except KeyError: log.warning( - f'Ignoring {msg} for unknwon context with {uid}') + f'Ignoring msg from [no-longer/un]known context with {uid}:' + f'\n{msg}') return send_chan = ctx._send_chan From 63ecae70c4c6ca2bc22e16bd8035a8fe825f8fc0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 16:29:57 -0500 Subject: [PATCH 24/31] Add a basic no-errors-when-backpressure stream test --- tests/test_context_streams.py | 87 ++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/tests/test_context_streams.py b/tests/test_context_streams.py index 9c45231..2ff361e 100644 --- a/tests/test_context_streams.py +++ b/tests/test_context_streams.py @@ -6,6 +6,7 @@ a ``tractor.Context`` beforehand. ''' from itertools import count +from typing import Optional import pytest import trio @@ -59,6 +60,7 @@ async def never_open_stream( async def keep_sending_from_callee( ctx: tractor.Context, + msg_buffer_size: Optional[int] = None, ) -> None: ''' @@ -66,8 +68,11 @@ async def keep_sending_from_callee( ''' await ctx.started() - async with ctx.open_stream() as stream: + async with ctx.open_stream( + msg_buffer_size=msg_buffer_size, + ) as stream: for msg in count(): + print(f'callee sending {msg}') await stream.send(msg) await trio.sleep(0.01) @@ -94,7 +99,7 @@ def test_one_end_stream_not_opened(overrun_by): async def main(): async with tractor.open_nursery() as n: portal = await n.start_actor( - 'starts_no_stream', + entrypoint.__name__, enable_modules=[__name__], ) @@ -158,3 +163,81 @@ def test_one_end_stream_not_opened(overrun_by): else: trio.run(main) + +@tractor.context +async def echo_back_sequence( + + ctx: tractor.Context, + seq: list[int], + msg_buffer_size: Optional[int] = None, + +) -> None: + ''' + Send endlessly on the calleee stream. + + ''' + await ctx.started() + async with ctx.open_stream( + msg_buffer_size=msg_buffer_size, + ) as stream: + + count = 0 + while count < 3: + batch = [] + async for msg in stream: + batch.append(msg) + if batch == seq: + break + + for msg in batch: + print(f'callee sending {msg}') + await stream.send(msg) + + count += 1 + + return 'yo' + + +def test_stream_backpressure(): + ''' + Demonstrate small overruns of each task back and forth + on a stream not raising any errors by default. + + ''' + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'callee_sends_forever', + enable_modules=[__name__], + ) + seq = list(range(3)) + async with portal.open_context( + echo_back_sequence, + seq=seq, + msg_buffer_size=1, + ) as (ctx, sent): + assert sent is None + + async with ctx.open_stream(msg_buffer_size=1) as stream: + count = 0 + while count < 3: + for msg in seq: + print(f'caller sending {msg}') + await stream.send(msg) + await trio.sleep(0.1) + + batch = [] + async for msg in stream: + batch.append(msg) + if batch == seq: + break + + count += 1 + + # here the context should return + assert await ctx.result() == 'yo' + + # cancel the daemon + await portal.cancel_actor() + + trio.run(main) From 52a2b7a5eda4c1e0ca8e7a7b62830861588e3a24 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 16:31:47 -0500 Subject: [PATCH 25/31] Bump windows timeout again --- tests/test_2way.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_2way.py b/tests/test_2way.py index 410c299..a497a48 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -71,7 +71,7 @@ def test_simple_context( pointlessly_open_stream, ): - timeout = 1.5 if not platform.system() == 'Windows' else 3 + timeout = 1.5 if not platform.system() == 'Windows' else 4 async def main(): From fd6f4574ceca4f0c2fcc79d940f12d17d785cbad Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 16:38:27 -0500 Subject: [PATCH 26/31] Rename test mod --- .../{test_context_streams.py => test_context_stream_semantics.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_context_streams.py => test_context_stream_semantics.py} (100%) diff --git a/tests/test_context_streams.py b/tests/test_context_stream_semantics.py similarity index 100% rename from tests/test_context_streams.py rename to tests/test_context_stream_semantics.py From efba5229fcdc9b76ef272c8a20b11361e54b5fb4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 16:45:44 -0500 Subject: [PATCH 27/31] Move context-streaming operational tests into one mod --- tests/test_2way.py | 409 +------------------------ tests/test_context_stream_semantics.py | 409 ++++++++++++++++++++++++- 2 files changed, 409 insertions(+), 409 deletions(-) diff --git a/tests/test_2way.py b/tests/test_2way.py index a497a48..db3be4d 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -1,418 +1,11 @@ """ -Bidirectional streaming and context API. +Bidirectional streaming. """ -import platform - import pytest import trio import tractor -from conftest import tractor_test - -# the general stream semantics are -# - normal termination: far end relays a stop message which -# terminates an ongoing ``MsgStream`` iteration -# - cancel termination: context is cancelled on either side cancelling -# the "linked" inter-actor task context - - -_state: bool = False - - -@tractor.context -async def simple_setup_teardown( - - ctx: tractor.Context, - data: int, - block_forever: bool = False, - -) -> None: - - # startup phase - global _state - _state = True - - # signal to parent that we're up - await ctx.started(data + 1) - - try: - if block_forever: - # block until cancelled - await trio.sleep_forever() - else: - return 'yo' - finally: - _state = False - - -async def assert_state(value: bool): - global _state - assert _state == value - - -@pytest.mark.parametrize( - 'error_parent', - [False, ValueError, KeyboardInterrupt], -) -@pytest.mark.parametrize( - 'callee_blocks_forever', - [False, True], - ids=lambda item: f'callee_blocks_forever={item}' -) -@pytest.mark.parametrize( - 'pointlessly_open_stream', - [False, True], - ids=lambda item: f'open_stream={item}' -) -def test_simple_context( - error_parent, - callee_blocks_forever, - pointlessly_open_stream, -): - - timeout = 1.5 if not platform.system() == 'Windows' else 4 - - async def main(): - - with trio.fail_after(timeout): - async with tractor.open_nursery() as nursery: - - portal = await nursery.start_actor( - 'simple_context', - enable_modules=[__name__], - ) - - try: - async with portal.open_context( - simple_setup_teardown, - data=10, - block_forever=callee_blocks_forever, - ) as (ctx, sent): - - assert sent == 11 - - if callee_blocks_forever: - await portal.run(assert_state, value=True) - else: - assert await ctx.result() == 'yo' - - if not error_parent: - await ctx.cancel() - - if pointlessly_open_stream: - async with ctx.open_stream(): - if error_parent: - raise error_parent - - if callee_blocks_forever: - await ctx.cancel() - else: - # in this case the stream will send a - # 'stop' msg to the far end which needs - # to be ignored - pass - else: - if error_parent: - raise error_parent - - finally: - - # after cancellation - if not error_parent: - await portal.run(assert_state, value=False) - - # shut down daemon - await portal.cancel_actor() - - if error_parent: - try: - trio.run(main) - except error_parent: - pass - except trio.MultiError as me: - # XXX: on windows it seems we may have to expect the group error - from tractor._exceptions import is_multi_cancelled - assert is_multi_cancelled(me) - else: - trio.run(main) - - -# basic stream terminations: -# - callee context closes without using stream -# - caller context closes without using stream -# - caller context calls `Context.cancel()` while streaming -# is ongoing resulting in callee being cancelled -# - callee calls `Context.cancel()` while streaming and caller -# sees stream terminated in `RemoteActorError` - -# TODO: future possible features -# - restart request: far end raises `ContextRestart` - - -@tractor.context -async def close_ctx_immediately( - - ctx: tractor.Context, - -) -> None: - - await ctx.started() - global _state - - async with ctx.open_stream(): - pass - - -@tractor_test -async def test_callee_closes_ctx_after_stream_open(): - 'callee context closes without using stream' - - async with tractor.open_nursery() as n: - - portal = await n.start_actor( - 'fast_stream_closer', - enable_modules=[__name__], - ) - - async with portal.open_context( - close_ctx_immediately, - - # flag to avoid waiting the final result - # cancel_on_exit=True, - - ) as (ctx, sent): - - assert sent is None - - with trio.fail_after(0.5): - async with ctx.open_stream() as stream: - - # should fall through since ``StopAsyncIteration`` - # should be raised through translation of - # a ``trio.EndOfChannel`` by - # ``trio.abc.ReceiveChannel.__anext__()`` - async for _ in stream: - assert 0 - else: - - # verify stream is now closed - try: - await stream.receive() - except trio.EndOfChannel: - pass - - # TODO: should be just raise the closed resource err - # directly here to enforce not allowing a re-open - # of a stream to the context (at least until a time of - # if/when we decide that's a good idea?) - try: - async with ctx.open_stream() as stream: - pass - except trio.ClosedResourceError: - pass - - await portal.cancel_actor() - - -@tractor.context -async def expect_cancelled( - - ctx: tractor.Context, - -) -> None: - global _state - _state = True - - await ctx.started() - - try: - async with ctx.open_stream() as stream: - async for msg in stream: - await stream.send(msg) # echo server - - except trio.Cancelled: - # expected case - _state = False - raise - - else: - assert 0, "Wasn't cancelled!?" - - -@pytest.mark.parametrize( - 'use_ctx_cancel_method', - [False, True], -) -@tractor_test -async def test_caller_closes_ctx_after_callee_opens_stream( - use_ctx_cancel_method: bool, -): - 'caller context closes without using stream' - - async with tractor.open_nursery() as n: - - portal = await n.start_actor( - 'ctx_cancelled', - enable_modules=[__name__], - ) - - async with portal.open_context( - expect_cancelled, - ) as (ctx, sent): - await portal.run(assert_state, value=True) - - assert sent is None - - # call cancel explicitly - if use_ctx_cancel_method: - - await ctx.cancel() - - try: - async with ctx.open_stream() as stream: - async for msg in stream: - pass - - except tractor.ContextCancelled: - raise # XXX: must be propagated to __aexit__ - - else: - assert 0, "Should have context cancelled?" - - # channel should still be up - assert portal.channel.connected() - - # ctx is closed here - await portal.run(assert_state, value=False) - - else: - try: - with trio.fail_after(0.2): - await ctx.result() - assert 0, "Callee should have blocked!?" - except trio.TooSlowError: - await ctx.cancel() - try: - async with ctx.open_stream() as stream: - async for msg in stream: - pass - except tractor.ContextCancelled: - pass - else: - assert 0, "Should have received closed resource error?" - - # ctx is closed here - await portal.run(assert_state, value=False) - - # channel should not have been destroyed yet, only the - # inter-actor-task context - assert portal.channel.connected() - - # teardown the actor - await portal.cancel_actor() - - -@tractor_test -async def test_multitask_caller_cancels_from_nonroot_task(): - - async with tractor.open_nursery() as n: - - portal = await n.start_actor( - 'ctx_cancelled', - enable_modules=[__name__], - ) - - async with portal.open_context( - expect_cancelled, - ) as (ctx, sent): - - await portal.run(assert_state, value=True) - assert sent is None - - async with ctx.open_stream() as stream: - - async def send_msg_then_cancel(): - await stream.send('yo') - await portal.run(assert_state, value=True) - await ctx.cancel() - await portal.run(assert_state, value=False) - - async with trio.open_nursery() as n: - n.start_soon(send_msg_then_cancel) - - try: - async for msg in stream: - assert msg == 'yo' - - except tractor.ContextCancelled: - raise # XXX: must be propagated to __aexit__ - - # channel should still be up - assert portal.channel.connected() - - # ctx is closed here - await portal.run(assert_state, value=False) - - # channel should not have been destroyed yet, only the - # inter-actor-task context - assert portal.channel.connected() - - # teardown the actor - await portal.cancel_actor() - - -@tractor.context -async def cancel_self( - - ctx: tractor.Context, - -) -> None: - global _state - _state = True - - await ctx.cancel() - try: - with trio.fail_after(0.1): - await trio.sleep_forever() - - except trio.Cancelled: - raise - - except trio.TooSlowError: - # should never get here - assert 0 - - -@tractor_test -async def test_callee_cancels_before_started(): - ''' - Callee calls `Context.cancel()` while streaming and caller - sees stream terminated in `ContextCancelled`. - - ''' - async with tractor.open_nursery() as n: - - portal = await n.start_actor( - 'cancels_self', - enable_modules=[__name__], - ) - try: - - async with portal.open_context( - cancel_self, - ) as (ctx, sent): - async with ctx.open_stream(): - - await trio.sleep_forever() - - # raises a special cancel signal - except tractor.ContextCancelled as ce: - ce.type == trio.Cancelled - - # teardown the actor - await portal.cancel_actor() - @tractor.context async def simple_rpc( diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 2ff361e..373c683 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -6,6 +6,7 @@ a ``tractor.Context`` beforehand. ''' from itertools import count +import platform from typing import Optional import pytest @@ -13,6 +14,411 @@ import trio import tractor from tractor._exceptions import StreamOverrun +from conftest import tractor_test + +# the general stream semantics are +# - normal termination: far end relays a stop message which +# terminates an ongoing ``MsgStream`` iteration +# - cancel termination: context is cancelled on either side cancelling +# the "linked" inter-actor task context + + +_state: bool = False + + +@tractor.context +async def simple_setup_teardown( + + ctx: tractor.Context, + data: int, + block_forever: bool = False, + +) -> None: + + # startup phase + global _state + _state = True + + # signal to parent that we're up + await ctx.started(data + 1) + + try: + if block_forever: + # block until cancelled + await trio.sleep_forever() + else: + return 'yo' + finally: + _state = False + + +async def assert_state(value: bool): + global _state + assert _state == value + + +@pytest.mark.parametrize( + 'error_parent', + [False, ValueError, KeyboardInterrupt], +) +@pytest.mark.parametrize( + 'callee_blocks_forever', + [False, True], + ids=lambda item: f'callee_blocks_forever={item}' +) +@pytest.mark.parametrize( + 'pointlessly_open_stream', + [False, True], + ids=lambda item: f'open_stream={item}' +) +def test_simple_context( + error_parent, + callee_blocks_forever, + pointlessly_open_stream, +): + + timeout = 1.5 if not platform.system() == 'Windows' else 4 + + async def main(): + + with trio.fail_after(timeout): + async with tractor.open_nursery() as nursery: + + portal = await nursery.start_actor( + 'simple_context', + enable_modules=[__name__], + ) + + try: + async with portal.open_context( + simple_setup_teardown, + data=10, + block_forever=callee_blocks_forever, + ) as (ctx, sent): + + assert sent == 11 + + if callee_blocks_forever: + await portal.run(assert_state, value=True) + else: + assert await ctx.result() == 'yo' + + if not error_parent: + await ctx.cancel() + + if pointlessly_open_stream: + async with ctx.open_stream(): + if error_parent: + raise error_parent + + if callee_blocks_forever: + await ctx.cancel() + else: + # in this case the stream will send a + # 'stop' msg to the far end which needs + # to be ignored + pass + else: + if error_parent: + raise error_parent + + finally: + + # after cancellation + if not error_parent: + await portal.run(assert_state, value=False) + + # shut down daemon + await portal.cancel_actor() + + if error_parent: + try: + trio.run(main) + except error_parent: + pass + except trio.MultiError as me: + # XXX: on windows it seems we may have to expect the group error + from tractor._exceptions import is_multi_cancelled + assert is_multi_cancelled(me) + else: + trio.run(main) + + +# basic stream terminations: +# - callee context closes without using stream +# - caller context closes without using stream +# - caller context calls `Context.cancel()` while streaming +# is ongoing resulting in callee being cancelled +# - callee calls `Context.cancel()` while streaming and caller +# sees stream terminated in `RemoteActorError` + +# TODO: future possible features +# - restart request: far end raises `ContextRestart` + + +@tractor.context +async def close_ctx_immediately( + + ctx: tractor.Context, + +) -> None: + + await ctx.started() + global _state + + async with ctx.open_stream(): + pass + + +@tractor_test +async def test_callee_closes_ctx_after_stream_open(): + 'callee context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'fast_stream_closer', + enable_modules=[__name__], + ) + + async with portal.open_context( + close_ctx_immediately, + + # flag to avoid waiting the final result + # cancel_on_exit=True, + + ) as (ctx, sent): + + assert sent is None + + with trio.fail_after(0.5): + async with ctx.open_stream() as stream: + + # should fall through since ``StopAsyncIteration`` + # should be raised through translation of + # a ``trio.EndOfChannel`` by + # ``trio.abc.ReceiveChannel.__anext__()`` + async for _ in stream: + assert 0 + else: + + # verify stream is now closed + try: + await stream.receive() + except trio.EndOfChannel: + pass + + # TODO: should be just raise the closed resource err + # directly here to enforce not allowing a re-open + # of a stream to the context (at least until a time of + # if/when we decide that's a good idea?) + try: + async with ctx.open_stream() as stream: + pass + except trio.ClosedResourceError: + pass + + await portal.cancel_actor() + + +@tractor.context +async def expect_cancelled( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.started() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + await stream.send(msg) # echo server + + except trio.Cancelled: + # expected case + _state = False + raise + + else: + assert 0, "Wasn't cancelled!?" + + +@pytest.mark.parametrize( + 'use_ctx_cancel_method', + [False, True], +) +@tractor_test +async def test_caller_closes_ctx_after_callee_opens_stream( + use_ctx_cancel_method: bool, +): + 'caller context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + await portal.run(assert_state, value=True) + + assert sent is None + + # call cancel explicitly + if use_ctx_cancel_method: + + await ctx.cancel() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + else: + assert 0, "Should have context cancelled?" + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + else: + try: + with trio.fail_after(0.2): + await ctx.result() + assert 0, "Callee should have blocked!?" + except trio.TooSlowError: + await ctx.cancel() + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + except tractor.ContextCancelled: + pass + else: + assert 0, "Should have received closed resource error?" + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor_test +async def test_multitask_caller_cancels_from_nonroot_task(): + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + + await portal.run(assert_state, value=True) + assert sent is None + + async with ctx.open_stream() as stream: + + async def send_msg_then_cancel(): + await stream.send('yo') + await portal.run(assert_state, value=True) + await ctx.cancel() + await portal.run(assert_state, value=False) + + async with trio.open_nursery() as n: + n.start_soon(send_msg_then_cancel) + + try: + async for msg in stream: + assert msg == 'yo' + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor.context +async def cancel_self( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.cancel() + try: + with trio.fail_after(0.1): + await trio.sleep_forever() + + except trio.Cancelled: + raise + + except trio.TooSlowError: + # should never get here + assert 0 + + +@tractor_test +async def test_callee_cancels_before_started(): + ''' + Callee calls `Context.cancel()` while streaming and caller + sees stream terminated in `ContextCancelled`. + + ''' + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'cancels_self', + enable_modules=[__name__], + ) + try: + + async with portal.open_context( + cancel_self, + ) as (ctx, sent): + async with ctx.open_stream(): + + await trio.sleep_forever() + + # raises a special cancel signal + except tractor.ContextCancelled as ce: + ce.type == trio.Cancelled + + # teardown the actor + await portal.cancel_actor() + @tractor.context async def really_started( @@ -115,7 +521,7 @@ def test_one_end_stream_not_opened(overrun_by): print(f'sending {i}') await stream.send(i) - if 'cancel' in overrunner: + if 'cancel' in overrunner: # without this we block waiting on the child side await ctx.cancel() @@ -164,6 +570,7 @@ def test_one_end_stream_not_opened(overrun_by): else: trio.run(main) + @tractor.context async def echo_back_sequence( From 4856285dee791fadb935f9e2fb2c0c6beb3e55c6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 17:04:17 -0500 Subject: [PATCH 28/31] Add back broken send chan ignore block --- tractor/_actor.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 742d9ae..dac2105 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -663,6 +663,18 @@ class Actor: # consumer task await ctx._maybe_raise_from_remote_msg(msg) + except trio.BrokenResourceError: + # TODO: what is the right way to handle the case where the + # local task has already sent a 'stop' / StopAsyncInteration + # to the other side but and possibly has closed the local + # feeder mem chan? Do we wait for some kind of ack or just + # let this fail silently and bubble up (currently)? + + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") + return + except trio.WouldBlock: # XXX: always push an error even if the local # receiver is in overrun state. @@ -691,16 +703,11 @@ class Actor: err_msg = pack_error(err) err_msg['cid'] = cid try: - # TODO: what is the right way to handle the case where the - # local task has already sent a 'stop' / StopAsyncInteration - # to the other side but and possibly has closed the local - # feeder mem chan? Do we wait for some kind of ack or just - # let this fail silently and bubble up (currently)? await chan.send(err_msg) except trio.BrokenResourceError: # XXX: local consumer has closed their side # so cancel the far end streaming task - log.warning(f"{send_chan} consumer is already closed") + log.warning(f"{chan} is already closed") def get_context( self, From df59071747f808d688479ac9c0dde7d9ed375a5c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 18:07:14 -0500 Subject: [PATCH 29/31] Bleh cast to list for `msgpack` --- tests/test_context_stream_semantics.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 373c683..b57cec7 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -588,6 +588,7 @@ async def echo_back_sequence( msg_buffer_size=msg_buffer_size, ) as stream: + seq = list(seq) # bleh, `msgpack`... count = 0 while count < 3: batch = [] From 703dee8a591ca0a9092f96329b4bdcc7532a2de1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Dec 2021 09:48:35 -0500 Subject: [PATCH 30/31] Add stream open before started, detailed semantics comment --- tests/test_context_stream_semantics.py | 131 ++++++++++++++++++------- 1 file changed, 97 insertions(+), 34 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index b57cec7..4c74a9a 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -16,16 +16,99 @@ from tractor._exceptions import StreamOverrun from conftest import tractor_test -# the general stream semantics are -# - normal termination: far end relays a stop message which -# terminates an ongoing ``MsgStream`` iteration -# - cancel termination: context is cancelled on either side cancelling -# the "linked" inter-actor task context +# ``Context`` semantics are as follows, +# ------------------------------------ + +# - standard setup/teardown: +# ``Portal.open_context()`` starts a new +# remote task context in another actor. The target actor's task must +# call ``Context.started()`` to unblock this entry on the caller side. +# the callee task executes until complete and returns a final value +# which is delivered to the caller side and retreived via +# ``Context.result()``. + +# - cancel termination: +# context can be cancelled on either side where either end's task can +# call ``Context.cancel()`` which raises a local ``trio.Cancelled`` +# and sends a task cancel request to the remote task which in turn +# raises a ``trio.Cancelled`` in that scope, catches it, and re-raises +# as ``ContextCancelled``. This is then caught by +# ``Portal.open_context()``'s exit and we get a graceful termination +# of the linked tasks. + +# - error termination: +# error is caught after all context-cancel-scope tasks are cancelled +# via regular ``trio`` cancel scope semantics, error is sent to other +# side and unpacked as a `RemoteActorError`. + + +# ``Context.open_stream() as stream: MsgStream:`` msg semantics are: +# ----------------------------------------------------------------- + +# - either side can ``.send()`` which emits a 'yield' msgs and delivers +# a value to the a ``MsgStream.receive()`` call. + +# - stream closure: one end relays a 'stop' message which terminates an +# ongoing ``MsgStream`` iteration. + +# - cancel/error termination: as per the context semantics above but +# with implicit stream closure on the cancelling end. _state: bool = False +@tractor.context +async def too_many_starteds( + ctx: tractor.Context, +) -> None: + ''' + Call ``Context.started()`` more then once (an error). + + ''' + await ctx.started() + try: + await ctx.started() + except RuntimeError: + raise + + +@tractor.context +async def not_started_but_stream_opened( + ctx: tractor.Context, +) -> None: + ''' + Enter ``Context.open_stream()`` without calling ``.started()``. + + ''' + try: + async with ctx.open_stream(): + assert 0 + except RuntimeError: + raise + + +@pytest.mark.parametrize( + 'target', + [too_many_starteds, not_started_but_stream_opened], + ids='misuse_type={}'.format, +) +def test_started_misuse(target): + + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + target.__name__, + enable_modules=[__name__], + ) + + async with portal.open_context(target) as (ctx, sent): + await trio.sleep(1) + + with pytest.raises(tractor.RemoteActorError): + trio.run(main) + + @tractor.context async def simple_setup_teardown( @@ -378,10 +461,18 @@ async def cancel_self( _state = True await ctx.cancel() + + # should inline raise immediately + try: + async with ctx.open_stream(): + pass + except ContextCancelled: + pass + + # check a real ``trio.Cancelled`` is raised on a checkpoint try: with trio.fail_after(0.1): await trio.sleep_forever() - except trio.Cancelled: raise @@ -420,34 +511,6 @@ async def test_callee_cancels_before_started(): await portal.cancel_actor() -@tractor.context -async def really_started( - ctx: tractor.Context, -) -> None: - await ctx.started() - try: - await ctx.started() - except RuntimeError: - raise - - -def test_started_called_more_then_once(): - - async def main(): - async with tractor.open_nursery() as n: - portal = await n.start_actor( - 'too_much_starteds', - enable_modules=[__name__], - ) - - async with portal.open_context(really_started) as (ctx, sent): - await trio.sleep(1) - # pass - - with pytest.raises(tractor.RemoteActorError): - trio.run(main) - - @tractor.context async def never_open_stream( From faaecbf810e137b27ef00a8d363c7d3c3bdb0de6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Dec 2021 11:11:50 -0500 Subject: [PATCH 31/31] Add nooz --- newsfragments/261.misc.rst | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 newsfragments/261.misc.rst diff --git a/newsfragments/261.misc.rst b/newsfragments/261.misc.rst new file mode 100644 index 0000000..6e4a934 --- /dev/null +++ b/newsfragments/261.misc.rst @@ -0,0 +1,37 @@ +Add cross-actor-task ``Context`` oriented error relay, a new +stream overrun error-signal ``StreamOverrun``, and support +disabling ``MsgStream`` backpressure as the default before a stream +is opened or by choice of the user. + +We added stricter semantics around ``tractor.Context.open_stream():`` +particularly to do with streams which are only opened at one end. +Previously, if only one end opened a stream there was no way for that +sender to know if msgs are being received until first, the feeder mem +chan on the receiver side hit a backpressure state and then that +condition delayed its msg loop processing task to eventually create +backpressure on the associated IPC transport. This is non-ideal in the +case where the receiver side never opened a stream by mistake since it +results in silent block of the sender and no adherence to the underlying +mem chan buffer size settings (which is still unsolved btw). + +To solve this we add non-backpressure style message pushing inside +``Actor._push_result()`` by default and only use the backpressure +``trio.MemorySendChannel.send()`` call **iff** the local end of the +context has entered ``Context.open_stream():``. This way if the stream +was never opened but the mem chan is overrun, we relay back to the +sender a (new exception) ``SteamOverrun`` error which is raised in the +sender's scope with a special error message about the stream never +having been opened. Further, this behaviour (non-backpressure style +where senders can expect an error on overruns) can now be enabled with +``.open_stream(backpressure=False)`` and the underlying mem chan size +can be specified with a kwarg ``msg_buffer_size: int``. + +Further bug fixes and enhancements in this changeset include: +- fix a race we were ignoring where if the callee task opened a context + it could enter ``Context.open_stream()`` before calling + ``.started()``. +- Disallow calling ``Context.started()`` more then once. +- Enable ``Context`` linked tasks error relaying via the new + ``Context._maybe_raise_from_remote_msg()`` which (for now) uses + a simple ``trio.Nursery.start_soon()`` to raise the error via closure + in the local scope.