From e3955bb62bae4ca87b4ba2a98f233612b0caca5e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 May 2021 15:10:03 -0400 Subject: [PATCH 01/50] Add initial bi-directional streaming This mostly adds the api described in https://github.com/goodboy/tractor/issues/53#issuecomment-806258798 The first draft summary: - formalize bidir steaming using the `trio.Channel` style interface which we derive as a `MsgStream` type. - add `Portal.open_context()` which provides a `trio.Nursery.start()` remote task invocation style for setting up and tearing down tasks contexts in remote actors. - add a distinct `'started'` message to the ipc protocol to facilitate `Context.start()` with a first return value. - for our `ReceiveMsgStream` type, don't cancel the remote task in `.aclose()`; this is now done explicitly by the surrounding `Context` usage: `Context.cancel()`. - streams in either direction still use a `'yield'` message keeping the proto mostly symmetric without having to worry about which side is the caller / portal opener. - subtlety: only allow sending a `'stop'` message during a 2-way streaming context from `ReceiveStream.aclose()`, detailed comment with explanation is included. Relates to #53 --- tractor/_actor.py | 95 ++++++--- tractor/_portal.py | 70 +++++-- tractor/_streaming.py | 444 ++++++++++++++++++++++++++---------------- 3 files changed, 406 insertions(+), 203 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 95e8592..0169784 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -14,6 +14,7 @@ from types import ModuleType import sys import os from contextlib import ExitStack +import warnings import trio # type: ignore from trio_typing import TaskStatus @@ -58,13 +59,37 @@ async def _invoke( treat_as_gen = False cs = None cancel_scope = trio.CancelScope() - ctx = Context(chan, cid, cancel_scope) + ctx = Context(chan, cid, _cancel_scope=cancel_scope) + context = False if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions + sig = inspect.signature(func) + params = sig.parameters + + # compat with old api kwargs['ctx'] = ctx + + if 'ctx' in params: + warnings.warn( + "`@tractor.stream decorated funcs should now declare " + "a `stream` arg, `ctx` is now designated for use with " + "@tractor.context", + DeprecationWarning, + stacklevel=2, + ) + + elif 'stream' in params: + assert 'stream' in params + kwargs['stream'] = ctx + treat_as_gen = True + elif getattr(func, '_tractor_context_function', False): + # handle decorated ``@tractor.context`` async function + kwargs['ctx'] = ctx + context = True + # errors raised inside this block are propgated back to caller try: if not ( @@ -102,26 +127,41 @@ async def _invoke( # `StopAsyncIteration` system here for returning a final # value if desired await chan.send({'stop': True, 'cid': cid}) + + # one way @stream func that gets treated like an async gen + elif treat_as_gen: + await chan.send({'functype': 'asyncgen', 'cid': cid}) + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as + # above + with cancel_scope as cs: + task_status.started(cs) + await coro + + if not cs.cancelled_caught: + # task was not cancelled so we can instruct the + # far end async gen to tear down + await chan.send({'stop': True, 'cid': cid}) + + elif context: + # context func with support for bi-dir streaming + await chan.send({'functype': 'context', 'cid': cid}) + + with cancel_scope as cs: + task_status.started(cs) + await chan.send({'return': await coro, 'cid': cid}) + + # if cs.cancelled_caught: + # # task was cancelled so relay to the cancel to caller + # await chan.send({'return': await coro, 'cid': cid}) + else: - if treat_as_gen: - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - with cancel_scope as cs: - task_status.started(cs) - await coro - if not cs.cancelled_caught: - # task was not cancelled so we can instruct the - # far end async gen to tear down - await chan.send({'stop': True, 'cid': cid}) - else: - # regular async function - await chan.send({'functype': 'asyncfunc', 'cid': cid}) - with cancel_scope as cs: - task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + # regular async function + await chan.send({'functype': 'asyncfunc', 'cid': cid}) + with cancel_scope as cs: + task_status.started(cs) + await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: @@ -416,10 +456,10 @@ class Actor: send_chan, recv_chan = self._cids2qs[(actorid, cid)] assert send_chan.cid == cid # type: ignore - if 'stop' in msg: - log.debug(f"{send_chan} was terminated at remote end") - # indicate to consumer that far end has stopped - return await send_chan.aclose() + # if 'stop' in msg: + # log.debug(f"{send_chan} was terminated at remote end") + # # indicate to consumer that far end has stopped + # return await send_chan.aclose() try: log.debug(f"Delivering {msg} from {actorid} to caller {cid}") @@ -427,6 +467,12 @@ class Actor: await send_chan.send(msg) except trio.BrokenResourceError: + # TODO: what is the right way to handle the case where the + # local task has already sent a 'stop' / StopAsyncInteration + # to the other side but and possibly has closed the local + # feeder mem chan? Do we wait for some kind of ack or just + # let this fail silently and bubble up (currently)? + # XXX: local consumer has closed their side # so cancel the far end streaming task log.warning(f"{send_chan} consumer is already closed") @@ -506,6 +552,7 @@ class Actor: if cid: # deliver response to local caller/waiter await self._push_result(chan, cid, msg) + log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue diff --git a/tractor/_portal.py b/tractor/_portal.py index d82e040..e13792b 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -312,11 +312,20 @@ class Portal: ctx = Context(self.channel, cid, _portal=self) try: - async with ReceiveMsgStream(ctx, recv_chan, self) as rchan: + # deliver receive only stream + async with ReceiveMsgStream(ctx, recv_chan) as rchan: self._streams.add(rchan) yield rchan + finally: + # cancel the far end task on consumer close + # NOTE: this is a special case since we assume that if using + # this ``.open_fream_from()`` api, the stream is one a one + # time use and we couple the far end tasks's lifetime to + # the consumer's scope; we don't ever send a `'stop'` + # message right now since there shouldn't be a reason to + # stop and restart the stream, right? try: await ctx.cancel() except trio.ClosedResourceError: @@ -326,16 +335,55 @@ class Portal: self._streams.remove(rchan) - # @asynccontextmanager - # async def open_context( - # self, - # func: Callable, - # **kwargs, - # ) -> Context: - # # TODO - # elif resptype == 'context': # context manager style setup/teardown - # # TODO likely not here though - # raise NotImplementedError + @asynccontextmanager + async def open_context( + self, + func: Callable, + **kwargs, + ) -> Context: + """Open an inter-actor task context. + + This is a synchronous API which allows for deterministic + setup/teardown of a remote task. The yielded ``Context`` further + allows for opening bidirectional streams - see + ``Context.open_stream()``. + + """ + # conduct target func method structural checks + if not inspect.iscoroutinefunction(func) and ( + getattr(func, '_tractor_contex_function', False) + ): + raise TypeError( + f'{func} must be an async generator function!') + + fn_mod_path, fn_name = func_deats(func) + + cid, recv_chan, functype, first_msg = await self._submit( + fn_mod_path, fn_name, kwargs) + + assert functype == 'context' + + msg = await recv_chan.receive() + try: + # the "first" value here is delivered by the callee's + # ``Context.started()`` call. + first = msg['started'] + + except KeyError: + assert msg.get('cid'), ("Received internal error at context?") + + if msg.get('error'): + # raise the error message + raise unpack_error(msg, self.channel) + else: + raise + try: + ctx = Context(self.channel, cid, _portal=self) + yield ctx, first + + finally: + await recv_chan.aclose() + await ctx.cancel() @dataclass diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 0836f4e..1c46801 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -1,19 +1,195 @@ import inspect -from contextlib import contextmanager # , asynccontextmanager +from contextlib import contextmanager, asynccontextmanager from dataclasses import dataclass -from typing import Any, Iterator, Optional +from typing import Any, Iterator, Optional, Callable import warnings import trio from ._ipc import Channel from ._exceptions import unpack_error +from ._state import current_actor from .log import get_logger log = get_logger(__name__) +# TODO: generic typing like trio's receive channel +# but with msgspec messages? +# class ReceiveChannel(AsyncResource, Generic[ReceiveType]): + + +class ReceiveMsgStream(trio.abc.ReceiveChannel): + """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with + special behaviour for signalling stream termination across an + inter-actor ``Channel``. This is the type returned to a local task + which invoked a remote streaming function using `Portal.run()`. + + Termination rules: + - if the local task signals stop iteration a cancel signal is + relayed to the remote task indicating to stop streaming + - if the remote task signals the end of a stream, raise a + ``StopAsyncIteration`` to terminate the local ``async for`` + + """ + def __init__( + self, + ctx: 'Context', # typing: ignore # noqa + rx_chan: trio.abc.ReceiveChannel, + ) -> None: + self._ctx = ctx + self._rx_chan = rx_chan + self._shielded = False + + # delegate directly to underlying mem channel + def receive_nowait(self): + return self._rx_chan.receive_nowait() + + async def receive(self): + try: + msg = await self._rx_chan.receive() + return msg['yield'] + + except KeyError: + # internal error should never get here + assert msg.get('cid'), ("Received internal error at portal?") + + # TODO: handle 2 cases with 3.10 match syntax + # - 'stop' + # - 'error' + # possibly just handle msg['stop'] here! + + if msg.get('stop'): + log.debug(f"{self} was stopped at remote end") + # when the send is closed we assume the stream has + # terminated and signal this local iterator to stop + await self.aclose() + raise trio.EndOfChannel + + # TODO: test that shows stream raising an expected error!!! + elif msg.get('error'): + # raise the error message + raise unpack_error(msg, self._ctx.chan) + + else: + raise + + except (trio.ClosedResourceError, StopAsyncIteration): + # XXX: this indicates that a `stop` message was + # sent by the far side of the underlying channel. + # Currently this is triggered by calling ``.aclose()`` on + # the send side of the channel inside + # ``Actor._push_result()``, but maybe it should be put here? + # to avoid exposing the internal mem chan closing mechanism? + # in theory we could instead do some flushing of the channel + # if needed to ensure all consumers are complete before + # triggering closure too early? + + # Locally, we want to close this stream gracefully, by + # terminating any local consumers tasks deterministically. + # We **don't** want to be closing this send channel and not + # relaying a final value to remaining consumers who may not + # have been scheduled to receive it yet? + + # lots of testing to do here + + # when the send is closed we assume the stream has + # terminated and signal this local iterator to stop + await self.aclose() + # await self._ctx.send_stop() + raise StopAsyncIteration + + except trio.Cancelled: + # relay cancels to the remote task + await self.aclose() + raise + + @contextmanager + def shield( + self + ) -> Iterator['ReceiveMsgStream']: # noqa + """Shield this stream's underlying channel such that a local consumer task + can be cancelled (and possibly restarted) using ``trio.Cancelled``. + + """ + self._shielded = True + yield self + self._shielded = False + + async def aclose(self): + """Cancel associated remote actor task and local memory channel + on close. + + """ + # TODO: proper adherance to trio's `.aclose()` semantics: + # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose + rx_chan = self._rx_chan + + if rx_chan._closed: + log.warning(f"{self} is already closed") + return + + # TODO: broadcasting to multiple consumers + # stats = rx_chan.statistics() + # if stats.open_receive_channels > 1: + # # if we've been cloned don't kill the stream + # log.debug( + # "there are still consumers running keeping stream alive") + # return + + if self._shielded: + log.warning(f"{self} is shielded, portal channel being kept alive") + return + + # NOTE: this is super subtle IPC messaging stuff: + # Relay stop iteration to far end **iff** we're + # in bidirectional mode. If we're only streaming + # *from* one side then that side **won't** have an + # entry in `Actor._cids2qs` (maybe it should though?). + # So any `yield` or `stop` msgs sent from the caller side + # will cause key errors on the callee side since there is + # no entry for a local feeder mem chan since the callee task + # isn't expecting messages to be sent by the caller. + # Thus, we must check that this context DOES NOT + # have a portal reference to ensure this is indeed the callee + # side and can relay a 'stop'. In the bidirectional case, + # `Context.open_stream()` will create the `Actor._cids2qs` + # entry from a call to `Actor.get_memchans()`. + if not self._ctx._portal: + # only for 2 way streams can we can send + # stop from the caller side + await self._ctx.send_stop() + + # close the local mem chan + rx_chan.close() + + # TODO: but make it broadcasting to consumers + # def clone(self): + # """Clone this receive channel allowing for multi-task + # consumption from the same channel. + + # """ + # return ReceiveStream( + # self._cid, + # self._rx_chan.clone(), + # self._portal, + # ) + + +class MsgStream(ReceiveMsgStream, trio.abc.Channel): + """ + Bidirectional message stream for use within an inter-actor actor + ``Context```. + + """ + async def send( + self, + data: Any + ) -> None: + await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) + + @dataclass(frozen=True) class Context: """An IAC (inter-actor communication) context. @@ -31,6 +207,10 @@ class Context: chan: Channel cid: str + # TODO: should we have seperate types for caller vs. callee + # side contexts? The caller always opens a portal whereas the callee + # is always responding back through a context-stream + # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa @@ -57,46 +237,97 @@ class Context: timeout quickly to sidestep 2-generals... """ - assert self._portal, ( - "No portal found, this is likely a callee side context") + if self._portal: # caller side: + if not self._portal: + raise RuntimeError( + "No portal found, this is likely a callee side context" + ) - cid = self.cid - with trio.move_on_after(0.5) as cs: - cs.shield = True - log.warning( - f"Cancelling stream {cid} to " - f"{self._portal.channel.uid}") - - # NOTE: we're telling the far end actor to cancel a task - # corresponding to *this actor*. The far end local channel - # instance is passed to `Actor._cancel_task()` implicitly. - await self._portal.run_from_ns('self', '_cancel_task', cid=cid) - - if cs.cancelled_caught: - # XXX: there's no way to know if the remote task was indeed - # cancelled in the case where the connection is broken or - # some other network error occurred. - if not self._portal.channel.connected(): + cid = self.cid + with trio.move_on_after(0.5) as cs: + cs.shield = True log.warning( - "May have failed to cancel remote task " - f"{cid} for {self._portal.channel.uid}") + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") + # NOTE: we're telling the far end actor to cancel a task + # corresponding to *this actor*. The far end local channel + # instance is passed to `Actor._cancel_task()` implicitly. + await self._portal.run_from_ns('self', '_cancel_task', cid=cid) + + if cs.cancelled_caught: + # XXX: there's no way to know if the remote task was indeed + # cancelled in the case where the connection is broken or + # some other network error occurred. + # if not self._portal.channel.connected(): + if not self.chan.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self._portal.channel.uid}") + else: + # ensure callee side + assert self._cancel_scope + # TODO: should we have an explicit cancel message + # or is relaying the local `trio.Cancelled` as an + # {'error': trio.Cancelled, cid: "blah"} enough? + # This probably gets into the discussion in + # https://github.com/goodboy/tractor/issues/36 + self._cancel_scope.cancel() + + # TODO: do we need a restart api? # async def restart(self) -> None: - # # TODO # pass - # @asynccontextmanager - # async def open_stream( - # self, - # ) -> AsyncContextManager: - # # TODO - # pass + @asynccontextmanager + async def open_stream( + self, + ) -> MsgStream: + # TODO + + actor = current_actor() + + # here we create a mem chan that corresponds to the + # far end caller / callee. + + # NOTE: in one way streaming this only happens on the + # caller side inside `Actor.send_cmd()` so if you try + # to send a stop from the caller to the callee in the + # single-direction-stream case you'll get a lookup error + # currently. + _, recv_chan = actor.get_memchans( + self.chan.uid, + self.cid + ) + + async with MsgStream(ctx=self, rx_chan=recv_chan) as rchan: + + if self._portal: + self._portal._streams.add(rchan) + + try: + yield rchan + + finally: + await self.send_stop() + if self._portal: + self._portal._streams.add(rchan) + + async def started(self, value: Any) -> None: + + if self._portal: + raise RuntimeError( + f"Caller side context {self} can not call started!") + + await self.chan.send({'started': value, 'cid': self.cid}) -def stream(func): +def stream(func: Callable) -> Callable: """Mark an async function as a streaming routine with ``@stream``. + """ + # annotate func._tractor_stream_function = True + sig = inspect.signature(func) params = sig.parameters if 'stream' not in params and 'ctx' in params: @@ -114,147 +345,24 @@ def stream(func): ): raise TypeError( "The first argument to the stream function " - f"{func.__name__} must be `ctx: tractor.Context`" + f"{func.__name__} must be `ctx: tractor.Context` " + "(Or ``to_trio`` if using ``asyncio`` in guest mode)." ) return func -class ReceiveMsgStream(trio.abc.ReceiveChannel): - """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with - special behaviour for signalling stream termination across an - inter-actor ``Channel``. This is the type returned to a local task - which invoked a remote streaming function using `Portal.run()`. - - Termination rules: - - if the local task signals stop iteration a cancel signal is - relayed to the remote task indicating to stop streaming - - if the remote task signals the end of a stream, raise a - ``StopAsyncIteration`` to terminate the local ``async for`` +def context(func: Callable) -> Callable: + """Mark an async function as a streaming routine with ``@context``. """ - def __init__( - self, - ctx: Context, - rx_chan: trio.abc.ReceiveChannel, - portal: 'Portal', # type: ignore # noqa - ) -> None: - self._ctx = ctx - self._rx_chan = rx_chan - self._portal = portal - self._shielded = False + # annotate + func._tractor_context_function = True - # delegate directly to underlying mem channel - def receive_nowait(self): - return self._rx_chan.receive_nowait() - - async def receive(self): - try: - msg = await self._rx_chan.receive() - return msg['yield'] - - except KeyError: - # internal error should never get here - assert msg.get('cid'), ("Received internal error at portal?") - - # TODO: handle 2 cases with 3.10 match syntax - # - 'stop' - # - 'error' - # possibly just handle msg['stop'] here! - - # TODO: test that shows stream raising an expected error!!! - if msg.get('error'): - # raise the error message - raise unpack_error(msg, self._portal.channel) - - except (trio.ClosedResourceError, StopAsyncIteration): - # XXX: this indicates that a `stop` message was - # sent by the far side of the underlying channel. - # Currently this is triggered by calling ``.aclose()`` on - # the send side of the channel inside - # ``Actor._push_result()``, but maybe it should be put here? - # to avoid exposing the internal mem chan closing mechanism? - # in theory we could instead do some flushing of the channel - # if needed to ensure all consumers are complete before - # triggering closure too early? - - # Locally, we want to close this stream gracefully, by - # terminating any local consumers tasks deterministically. - # We **don't** want to be closing this send channel and not - # relaying a final value to remaining consumers who may not - # have been scheduled to receive it yet? - - # lots of testing to do here - - # when the send is closed we assume the stream has - # terminated and signal this local iterator to stop - await self.aclose() - raise StopAsyncIteration - - except trio.Cancelled: - # relay cancels to the remote task - await self.aclose() - raise - - @contextmanager - def shield( - self - ) -> Iterator['ReceiveMsgStream']: # noqa - """Shield this stream's underlying channel such that a local consumer task - can be cancelled (and possibly restarted) using ``trio.Cancelled``. - - """ - self._shielded = True - yield self - self._shielded = False - - async def aclose(self): - """Cancel associated remote actor task and local memory channel - on close. - """ - rx_chan = self._rx_chan - - if rx_chan._closed: - log.warning(f"{self} is already closed") - return - - # stats = rx_chan.statistics() - # if stats.open_receive_channels > 1: - # # if we've been cloned don't kill the stream - # log.debug( - # "there are still consumers running keeping stream alive") - # return - - if self._shielded: - log.warning(f"{self} is shielded, portal channel being kept alive") - return - - # close the local mem chan - rx_chan.close() - - # cancel surrounding IPC context - await self._ctx.cancel() - - # TODO: but make it broadcasting to consumers - # def clone(self): - # """Clone this receive channel allowing for multi-task - # consumption from the same channel. - - # """ - # return ReceiveStream( - # self._cid, - # self._rx_chan.clone(), - # self._portal, - # ) - - -# class MsgStream(ReceiveMsgStream, trio.abc.Channel): -# """ -# Bidirectional message stream for use within an inter-actor actor -# ``Context```. - -# """ -# async def send( -# self, -# data: Any -# ) -> None: -# await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) + sig = inspect.signature(func) + params = sig.parameters + if 'ctx' not in params: + raise TypeError( + "The first argument to the context function " + f"{func.__name__} must be `ctx: tractor.Context`" + ) + return func From 14114547e8f01253cbbb8e8638b74fc796b6ca66 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 2 May 2021 14:12:35 -0400 Subject: [PATCH 02/50] Expose `@context` decorator at top level --- tractor/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 7e6f800..25b56b1 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -5,7 +5,7 @@ tractor: An actor model micro-framework built on from trio import MultiError from ._ipc import Channel -from ._streaming import Context, stream +from ._streaming import Context, stream, context from ._discovery import get_arbiter, find_actor, wait_for_actor from ._trionics import open_nursery from ._state import current_actor, is_root_process @@ -33,7 +33,7 @@ __all__ = [ 'run', 'run_daemon', 'stream', - 'wait_for_actor', + 'context', 'to_asyncio', 'wait_for_actor', ] From 4846c6d498ab80c62debec9e502f737f6baf2436 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 2 May 2021 14:13:23 -0400 Subject: [PATCH 03/50] Cancel scope on stream consumer completion --- tests/test_streaming.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 8d8169e..43acb95 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -338,6 +338,8 @@ async def test_respawn_consumer_task( print("all values streamed, BREAKING") break + cs.cancel() + # TODO: this is justification for a # ``ActorNursery.stream_from_actor()`` helper? await portal.cancel_actor() From 4240efc7e35a66400d990a6b5567fde5bb6037c3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 2 May 2021 14:13:44 -0400 Subject: [PATCH 04/50] Add basic test set --- tests/test_2way.py | 140 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 tests/test_2way.py diff --git a/tests/test_2way.py b/tests/test_2way.py new file mode 100644 index 0000000..fdcec1c --- /dev/null +++ b/tests/test_2way.py @@ -0,0 +1,140 @@ +""" +Bidirectional streaming and context API. +""" + +import trio +import tractor + +# from conftest import tractor_test + +# TODO: test endofchannel semantics / cancellation / error cases: +# 3 possible outcomes: +# - normal termination: far end relays a stop message with +# final value as in async gen from ``return ``. + +# possible outcomes: +# - normal termination: far end returns +# - premature close: far end relays a stop message to tear down stream +# - cancel: far end raises `ContextCancelled` + +# future possible outcomes +# - restart request: far end raises `ContextRestart` + + +_state: bool = False + + +@tractor.context +async def simple_setup_teardown( + + ctx: tractor.Context, + data: int, + +) -> None: + + # startup phase + global _state + _state = True + + # signal to parent that we're up + await ctx.started(data + 1) + + try: + # block until cancelled + await trio.sleep_forever() + finally: + _state = False + + +async def assert_state(value: bool): + global _state + assert _state == value + + +def test_simple_contex(): + + async def main(): + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'simple_context', + enable_modules=[__name__], + ) + + async with portal.open_context( + simple_setup_teardown, + data=10, + ) as (ctx, sent): + + assert sent == 11 + + await portal.run(assert_state, value=True) + + # after cancellation + await portal.run(assert_state, value=False) + + # shut down daemon + await portal.cancel_actor() + + trio.run(main) + + +@tractor.context +async def simple_rpc( + + ctx: tractor.Context, + data: int, + +) -> None: + + # signal to parent that we're up + await ctx.started(data + 1) + + print('opening stream in callee') + async with ctx.open_stream() as stream: + + count = 0 + while True: + try: + await stream.receive() == 'ping' + except trio.EndOfChannel: + assert count == 10 + break + else: + print('pong') + await stream.send('pong') + count += 1 + + +def test_simple_rpc(): + """The simplest request response pattern. + + """ + async def main(): + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'rpc_server', + enable_modules=[__name__], + ) + + async with portal.open_context( + simple_rpc, + data=10, + ) as (ctx, sent): + + assert sent == 11 + + async with ctx.open_stream() as stream: + + for _ in range(10): + + print('ping') + await stream.send('ping') + assert await stream.receive() == 'pong' + + # stream should terminate here + + await portal.cancel_actor() + + trio.run(main) From 1f8966ba647f708d52e01230a06e2d0e7b6c47f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 May 2021 11:20:51 -0400 Subject: [PATCH 05/50] Support passing `shield` at stream contruction --- tractor/_streaming.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 1c46801..10be2c6 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -37,10 +37,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.abc.ReceiveChannel, + shield: bool = False, ) -> None: self._ctx = ctx self._rx_chan = rx_chan - self._shielded = False + self._shielded = shield # delegate directly to underlying mem channel def receive_nowait(self): @@ -112,6 +113,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): """Shield this stream's underlying channel such that a local consumer task can be cancelled (and possibly restarted) using ``trio.Cancelled``. + Note that here, "shielding" here guards against relaying + a ``'stop'`` message to the far end of the stream thus keeping + the stream machinery active and ready for further use, it does + not have anything to do with an internal ``trio.CancelScope``. + """ self._shielded = True yield self @@ -162,7 +168,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): await self._ctx.send_stop() # close the local mem chan - rx_chan.close() + await rx_chan.aclose() # TODO: but make it broadcasting to consumers # def clone(self): @@ -281,6 +287,7 @@ class Context: @asynccontextmanager async def open_stream( self, + shield: bool = False, ) -> MsgStream: # TODO @@ -299,7 +306,11 @@ class Context: self.cid ) - async with MsgStream(ctx=self, rx_chan=recv_chan) as rchan: + async with MsgStream( + ctx=self, + rx_chan=recv_chan, + shield=shield, + ) as rchan: if self._portal: self._portal._streams.add(rchan) @@ -308,9 +319,11 @@ class Context: yield rchan finally: + # signal ``StopAsyncIteration`` on far end. await self.send_stop() + if self._portal: - self._portal._streams.add(rchan) + self._portal._streams.remove(rchan) async def started(self, value: Any) -> None: From 98133a984e0f4e170c62109301f726c3c6f64438 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 May 2021 11:41:18 -0400 Subject: [PATCH 06/50] Parametrize with async for style tests --- tests/test_2way.py | 68 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 5 deletions(-) diff --git a/tests/test_2way.py b/tests/test_2way.py index fdcec1c..82ee792 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -1,7 +1,8 @@ """ Bidirectional streaming and context API. -""" +""" +import pytest import trio import tractor @@ -86,7 +87,9 @@ async def simple_rpc( data: int, ) -> None: + """Test a small ping-pong server. + """ # signal to parent that we're up await ctx.started(data + 1) @@ -106,7 +109,44 @@ async def simple_rpc( count += 1 -def test_simple_rpc(): +@tractor.context +async def simple_rpc_with_forloop( + + ctx: tractor.Context, + data: int, + +) -> None: + """Same as previous test but using ``async for`` syntax/api. + + """ + + # signal to parent that we're up + await ctx.started(data + 1) + + print('opening stream in callee') + async with ctx.open_stream() as stream: + + count = 0 + async for msg in stream: + + assert msg == 'ping' + print('pong') + await stream.send('pong') + count += 1 + + else: + assert count == 10 + + +@pytest.mark.parametrize( + 'use_async_for', + [True, False], +) +@pytest.mark.parametrize( + 'server_func', + [simple_rpc, simple_rpc_with_forloop], +) +def test_simple_rpc(server_func, use_async_for): """The simplest request response pattern. """ @@ -119,7 +159,7 @@ def test_simple_rpc(): ) async with portal.open_context( - simple_rpc, + server_func, # taken from pytest parameterization data=10, ) as (ctx, sent): @@ -127,11 +167,29 @@ def test_simple_rpc(): async with ctx.open_stream() as stream: - for _ in range(10): + if use_async_for: + count = 0 + # receive msgs using async for style print('ping') await stream.send('ping') - assert await stream.receive() == 'pong' + + async for msg in stream: + assert msg == 'pong' + print('ping') + await stream.send('ping') + count += 1 + + if count >= 9: + break + + else: + # classic send/receive style + for _ in range(10): + + print('ping') + await stream.send('ping') + assert await stream.receive() == 'pong' # stream should terminate here From 08eb6bd0190f5e10781d4e929b5093beb3ede30e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 May 2021 11:52:08 -0400 Subject: [PATCH 07/50] Fix typing --- tractor/_portal.py | 2 +- tractor/_streaming.py | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index e13792b..8944489 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -340,7 +340,7 @@ class Portal: self, func: Callable, **kwargs, - ) -> Context: + ) -> AsyncGenerator[Tuple[Context, Any], None]: """Open an inter-actor task context. This is a synchronous API which allows for deterministic diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 10be2c6..9bc32e3 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -1,7 +1,15 @@ +""" +Message stream types and APIs. + +""" import inspect from contextlib import contextmanager, asynccontextmanager from dataclasses import dataclass -from typing import Any, Iterator, Optional, Callable +from typing import ( + Any, Iterator, Optional, Callable, + AsyncGenerator, +) + import warnings import trio @@ -288,7 +296,7 @@ class Context: async def open_stream( self, shield: bool = False, - ) -> MsgStream: + ) -> AsyncGenerator[MsgStream, None]: # TODO actor = current_actor() @@ -339,7 +347,9 @@ def stream(func: Callable) -> Callable: """ # annotate - func._tractor_stream_function = True + # TODO: apply whatever solution ``mypy`` ends up picking for this: + # https://github.com/python/mypy/issues/2087#issuecomment-769266912 + func._tractor_stream_function = True # type: ignore sig = inspect.signature(func) params = sig.parameters @@ -369,7 +379,9 @@ def context(func: Callable) -> Callable: """ # annotate - func._tractor_context_function = True + # TODO: apply whatever solution ``mypy`` ends up picking for this: + # https://github.com/python/mypy/issues/2087#issuecomment-769266912 + func._tractor_context_function = True # type: ignore sig = inspect.signature(func) params = sig.parameters From e311430d2529e590d4b19e50074327942bb780f6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 May 2021 07:23:39 -0400 Subject: [PATCH 08/50] Be more pedantic with error handling --- tractor/_portal.py | 44 +++++++++++++++++++++++++------------------- tractor/_trionics.py | 4 ++-- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 8944489..0ce1a53 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -358,33 +358,39 @@ class Portal: fn_mod_path, fn_name = func_deats(func) - cid, recv_chan, functype, first_msg = await self._submit( - fn_mod_path, fn_name, kwargs) - assert functype == 'context' - - msg = await recv_chan.receive() try: - # the "first" value here is delivered by the callee's - # ``Context.started()`` call. - first = msg['started'] + cid, recv_chan, functype, first_msg = await self._submit( + fn_mod_path, fn_name, kwargs) - except KeyError: - assert msg.get('cid'), ("Received internal error at context?") + assert functype == 'context' + msg = await recv_chan.receive() - if msg.get('error'): - # raise the error message - raise unpack_error(msg, self.channel) - else: - raise - try: + try: + # the "first" value here is delivered by the callee's + # ``Context.started()`` call. + first = msg['started'] + + except KeyError: + assert msg.get('cid'), ("Received internal error at context?") + + if msg.get('error'): + # raise the error message + raise unpack_error(msg, self.channel) + else: + raise + + # deliver context instance and .started() msg value in open + # tuple. ctx = Context(self.channel, cid, _portal=self) - yield ctx, first + try: + yield ctx, first + + finally: + await ctx.cancel() finally: await recv_chan.aclose() - await ctx.cancel() - @dataclass class LocalPortal: diff --git a/tractor/_trionics.py b/tractor/_trionics.py index dcf7aa5..894ab7d 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -357,7 +357,8 @@ async def open_nursery( try: if actor is None and is_main_process(): - # if we are the parent process start the actor runtime implicitly + # if we are the parent process start the + # actor runtime implicitly log.info("Starting actor runtime!") # mark us for teardown on exit @@ -376,7 +377,6 @@ async def open_nursery( async with _open_and_supervise_one_cancels_all_nursery( actor ) as anursery: - yield anursery finally: From e5bc07f355936e10b33af3f6a3ac506c20f1a80b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 May 2021 23:30:10 -0400 Subject: [PATCH 09/50] Add dynamic pubsub test using new bidir stream apis --- tests/test_advanced_streaming.py | 142 +++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 tests/test_advanced_streaming.py diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py new file mode 100644 index 0000000..246fc1b --- /dev/null +++ b/tests/test_advanced_streaming.py @@ -0,0 +1,142 @@ +""" +Advanced streaming patterns using bidirectional streams and contexts. + +""" +import itertools +from typing import Set, Dict, List + +import trio +import tractor + + +_registry: Dict[str, Set[tractor.ReceiveMsgStream]] = { + 'even': set(), + 'odd': set(), +} + + +async def publisher( + + seed: int = 0, + +) -> None: + + global _registry + + def is_even(i): + return i % 2 == 0 + + for val in itertools.count(seed): + + sub = 'even' if is_even(val) else 'odd' + + for sub_stream in _registry[sub]: + await sub_stream.send(val) + + # throttle send rate to ~4Hz + # making it readable to a human user + await trio.sleep(1/4) + + +@tractor.context +async def subscribe( + + ctx: tractor.Context, + +) -> None: + + global _registry + + # syn caller + await ctx.started(None) + + async with ctx.open_stream() as stream: + + # update subs list as consumer requests + async for new_subs in stream: + + new_subs = set(new_subs) + remove = new_subs - _registry.keys() + + print(f'setting sub to {new_subs} for {ctx.chan.uid}') + + # remove old subs + for sub in remove: + _registry[sub].remove(stream) + + # add new subs for consumer + for sub in new_subs: + _registry[sub].add(stream) + + +async def consumer( + + subs: List[str], + +) -> None: + + uid = tractor.current_actor().uid + + async with tractor.wait_for_actor('publisher') as portal: + async with portal.open_context(subscribe) as (ctx, first): + async with ctx.open_stream() as stream: + + # flip between the provided subs dynamically + if len(subs) > 1: + + for sub in itertools.cycle(subs): + print(f'setting dynamic sub to {sub}') + await stream.send([sub]) + + count = 0 + async for value in stream: + print(f'{uid} got: {value}') + if count > 5: + break + count += 1 + + else: # static sub + + await stream.send(subs) + async for value in stream: + print(f'{uid} got: {value}') + + +def test_dynamic_pub_sub(): + + global _registry + + from multiprocessing import cpu_count + cpus = cpu_count() + + async def main(): + async with tractor.open_nursery() as n: + + # name of this actor will be same as target func + await n.run_in_actor(publisher) + + for i, sub in zip( + range(cpus - 2), + itertools.cycle(_registry.keys()) + ): + await n.run_in_actor( + consumer, + name=f'consumer_{sub}', + subs=[sub], + ) + + # make one dynamic subscriber + await n.run_in_actor( + consumer, + name='consumer_dynamic', + subs=list(_registry.keys()), + ) + + # block until cancelled by user + with trio.fail_after(10): + await trio.sleep_forever() + + try: + trio.run(main) + except trio.TooSlowError: + pass From 6559fb72aa752b1a8684889f1dd81c50c72c5421 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 May 2021 23:41:26 -0400 Subject: [PATCH 10/50] Expose msg stream types at top level --- tractor/__init__.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 25b56b1..9ff62cc 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -5,7 +5,13 @@ tractor: An actor model micro-framework built on from trio import MultiError from ._ipc import Channel -from ._streaming import Context, stream, context +from ._streaming import ( + Context, + ReceiveMsgStream, + MsgStream, + stream, + context, +) from ._discovery import get_arbiter, find_actor, wait_for_actor from ._trionics import open_nursery from ._state import current_actor, is_root_process From a2e2f7e7a89dad8c4271a21b62eac812394ff71a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 May 2021 23:42:34 -0400 Subject: [PATCH 11/50] Only send stop msg if not received from far end --- tractor/_streaming.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9bc32e3..085d994 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -53,7 +53,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # delegate directly to underlying mem channel def receive_nowait(self): - return self._rx_chan.receive_nowait() + msg = self._rx_chan.receive_nowait() + return msg['yield'] async def receive(self): try: @@ -106,6 +107,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # when the send is closed we assume the stream has # terminated and signal this local iterator to stop await self.aclose() + # await self._ctx.send_stop() raise StopAsyncIteration @@ -326,10 +328,14 @@ class Context: try: yield rchan - finally: + except trio.EndOfChannel: + raise + + else: # signal ``StopAsyncIteration`` on far end. await self.send_stop() + finally: if self._portal: self._portal._streams.remove(rchan) From 9a4244b9a6e8b06aca2ef1b15404b4cbd65a4abd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 25 May 2021 09:19:07 -0400 Subject: [PATCH 12/50] Support no arg to `Context.started()` like trio --- tractor/_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 085d994..fb5f8a8 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -339,7 +339,7 @@ class Context: if self._portal: self._portal._streams.remove(rchan) - async def started(self, value: Any) -> None: + async def started(self, value: Optional[Any] = None) -> None: if self._portal: raise RuntimeError( From 5b8b7d374ae8c802e4359204c96b1671df3bad10 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Jun 2021 13:57:16 -0400 Subject: [PATCH 13/50] Add error case --- tests/test_2way.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/test_2way.py b/tests/test_2way.py index 82ee792..fec0cae 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -52,9 +52,14 @@ async def assert_state(value: bool): assert _state == value -def test_simple_contex(): +@pytest.mark.parametrize( + 'error_parent', + [False, True], +) +def test_simple_context(error_parent): async def main(): + async with tractor.open_nursery() as n: portal = await n.start_actor( @@ -74,10 +79,19 @@ def test_simple_contex(): # after cancellation await portal.run(assert_state, value=False) + if error_parent: + raise ValueError + # shut down daemon await portal.cancel_actor() - trio.run(main) + if error_parent: + try: + trio.run(main) + except ValueError: + pass + else: + trio.run(main) @tractor.context From 39b9896a62b9507901e49d76ecfa103689c5d452 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Jun 2021 13:58:06 -0400 Subject: [PATCH 14/50] Only close recv chan if we get a ref --- tractor/_portal.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 0ce1a53..7f29e0e 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -359,6 +359,7 @@ class Portal: fn_mod_path, fn_name = func_deats(func) + recv_chan: trio.ReceiveMemoryChannel = None try: cid, recv_chan, functype, first_msg = await self._submit( fn_mod_path, fn_name, kwargs) @@ -390,7 +391,8 @@ class Portal: await ctx.cancel() finally: - await recv_chan.aclose() + if recv_chan is not None: + await recv_chan.aclose() @dataclass class LocalPortal: From 3999849b0388005c2357a1dc0f045deda357d527 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Jun 2021 14:00:09 -0400 Subject: [PATCH 15/50] Add a multi-task streaming test --- tests/test_advanced_streaming.py | 78 ++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 246fc1b..b5cf2b9 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -140,3 +140,81 @@ def test_dynamic_pub_sub(): trio.run(main) except trio.TooSlowError: pass + + +@tractor.context +async def one_task_streams_and_one_handles_reqresp( + + ctx: tractor.Context, + +) -> None: + + await ctx.started() + + async with ctx.open_stream() as stream: + + async def pingpong(): + '''Run a simple req/response service. + + ''' + async for msg in stream: + print('rpc server ping') + assert msg == 'ping' + print('rpc server pong') + await stream.send('pong') + + async with trio.open_nursery() as n: + n.start_soon(pingpong) + + for _ in itertools.count(): + await stream.send('yo') + await trio.sleep(0.01) + + +def test_reqresp_ontopof_streaming(): + '''Test a subactor that both streams with one task and + spawns another which handles a small requests-response + dialogue over the same bidir-stream. + + ''' + async def main(): + + with trio.move_on_after(2): + async with tractor.open_nursery() as n: + + # name of this actor will be same as target func + portal = await n.start_actor( + 'dual_tasks', + enable_modules=[__name__] + ) + + # flat to make sure we get at least one pong + got_pong: bool = False + + async with portal.open_context( + one_task_streams_and_one_handles_reqresp, + + ) as (ctx, first): + + assert first is None + + async with ctx.open_stream() as stream: + + await stream.send('ping') + + async for msg in stream: + print(f'client received: {msg}') + + assert msg in {'pong', 'yo'} + + if msg == 'pong': + got_pong = True + await stream.send('ping') + print('client sent ping') + + assert got_pong + + try: + trio.run(main) + except trio.TooSlowError: + pass From eb3662f981e4f6b5d55f35c0931018750baeaacc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 18:01:49 -0400 Subject: [PATCH 16/50] Add a specially handled `ContextCancelled` error --- tractor/_exceptions.py | 53 +++++++++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index f6d9f47..07ea4e6 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -1,7 +1,7 @@ """ Our classy exception set. """ -from typing import Dict, Any +from typing import Dict, Any, Optional import importlib import builtins import traceback @@ -15,17 +15,16 @@ _this_mod = importlib.import_module(__name__) class RemoteActorError(Exception): # TODO: local recontruction of remote exception deats "Remote actor exception bundled locally" - def __init__(self, message, type_str, **msgdata) -> None: - super().__init__(message) - for ns in [builtins, _this_mod, trio]: - try: - self.type = getattr(ns, type_str) - break - except AttributeError: - continue - else: - self.type = Exception + def __init__( + self, + message: str, + suberror_type: Optional[Exception] = None, + **msgdata + ) -> None: + super().__init__(message) + + self.type = suberror_type self.msgdata = msgdata # TODO: a trio.MultiError.catch like context manager @@ -41,6 +40,9 @@ class InternalActorError(RemoteActorError): class TransportClosed(trio.ClosedResourceError): "Underlying channel transport was closed prior to use" +class ContextCancelled(RemoteActorError): + "Inter-actor task context cancelled itself on the callee side." + class NoResult(RuntimeError): "No final result is expected for this actor" @@ -77,12 +79,35 @@ def unpack_error( into a local ``RemoteActorError``. """ - tb_str = msg['error'].get('tb_str', '') - return err_type( - f"{chan.uid}\n" + tb_str, + error = msg['error'] + + tb_str = error.get('tb_str', '') + message = f"{chan.uid}\n" + tb_str + type_name = error['type_str'] + suberror_type = Exception + + if type_name == 'ContextCancelled': + err_type = ContextCancelled + suberror_type = trio.Cancelled + + else: # try to lookup a suitable local error type + for ns in [builtins, _this_mod, trio]: + try: + suberror_type = getattr(ns, type_name) + break + except AttributeError: + continue + + exc = err_type( + message, + suberror_type=suberror_type, + + # unpack other fields into error type init **msg['error'], ) + return exc + def is_multi_cancelled(exc: BaseException) -> bool: """Predicate to determine if a ``trio.MultiError`` contains only From 409f7f0d5ac6c3316b2662ae5a9f5b1bf02e45bf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 18:02:27 -0400 Subject: [PATCH 17/50] Expose streaming components at top level --- tractor/__init__.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 9ff62cc..a7cadb9 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -15,7 +15,11 @@ from ._streaming import ( from ._discovery import get_arbiter, find_actor, wait_for_actor from ._trionics import open_nursery from ._state import current_actor, is_root_process -from ._exceptions import RemoteActorError, ModuleNotExposed +from ._exceptions import ( + RemoteActorError, + ModuleNotExposed, + ContextCancelled, +) from ._debug import breakpoint, post_mortem from . import msg from ._root import run, run_daemon, open_root_actor @@ -27,6 +31,7 @@ __all__ = [ 'ModuleNotExposed', 'MultiError', 'RemoteActorError', + 'ContextCancelled', 'breakpoint', 'current_actor', 'find_actor', @@ -40,6 +45,8 @@ __all__ = [ 'run_daemon', 'stream', 'context', + 'ReceiveMsgStream', + 'MsgStream', 'to_asyncio', 'wait_for_actor', ] From 73302d9d16c005b74c4d3981f44d58a37ddd05c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 18:03:50 -0400 Subject: [PATCH 18/50] Specially raise a `ContextCancelled` for a task-context rpc --- tractor/_actor.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 0169784..4b215f8 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -28,6 +28,7 @@ from ._exceptions import ( unpack_error, ModuleNotExposed, is_multi_cancelled, + ContextCancelled, TransportClosed, ) from . import _debug @@ -152,9 +153,12 @@ async def _invoke( task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - # if cs.cancelled_caught: - # # task was cancelled so relay to the cancel to caller - # await chan.send({'return': await coro, 'cid': cid}) + if cs.cancelled_caught: + # task-contex was cancelled so relay to the cancel to caller + raise ContextCancelled( + f'{func.__name__} cancelled itself', + suberror_type=trio.Cancelled, + ) else: # regular async function @@ -168,7 +172,8 @@ async def _invoke( # TODO: maybe we'll want differnet "levels" of debugging # eventualy such as ('app', 'supervisory', 'runtime') ? if not isinstance(err, trio.ClosedResourceError) and ( - not is_multi_cancelled(err) + not is_multi_cancelled(err)) and ( + not isinstance(err, ContextCancelled) ): # XXX: is there any case where we'll want to debug IPC # disconnects? I can't think of a reason that inspecting From 348148ff1ed7a4e321b6ab06e1b3bd6a18deec34 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 19:58:52 -0400 Subject: [PATCH 19/50] Explicitly formalize context/streaming teardown Add clear teardown semantics for `Context` such that the remote side cancellation propagation happens only on error or if client code explicitly requests it (either by exit flag to `Portal.open_context()` or by manually calling `Context.cancel()`). Add `Context.result()` to wait on and capture the final result from a remote context function; any lingering msg sequence will be consumed/discarded. Changes in order to make this possible: - pass the runtime msg loop's feeder receive channel in to the context on the calling (portal opening) side such that a final 'return' msg can be waited upon using `Context.result()` which delivers the final return value from the callee side `@tractor.context` async function. - always await a final result from the target context function in `Portal.open_context()`'s `__aexit__()` if the context has not been (requested to be) cancelled by client code on block exit. - add an internal `Context._cancel_called` for context "cancel requested" tracking (much like `trio`'s cancel scope). - allow flagging a stream as terminated using an internal `._eoc` flag which will mark the stream as stopped for iteration. - drop `StopAsyncIteration` catching in `.receive()`; it does nothing. --- tractor/_portal.py | 58 +++++++++++-- tractor/_streaming.py | 194 +++++++++++++++++++++++++++++++++++------- 2 files changed, 210 insertions(+), 42 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 7f29e0e..32e71b0 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -17,7 +17,12 @@ from async_generator import asynccontextmanager from ._state import current_actor from ._ipc import Channel from .log import get_logger -from ._exceptions import unpack_error, NoResult, RemoteActorError +from ._exceptions import ( + unpack_error, + NoResult, + RemoteActorError, + ContextCancelled, +) from ._streaming import Context, ReceiveMsgStream @@ -84,7 +89,7 @@ class Portal: ns: str, func: str, kwargs, - ) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: + ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -327,7 +332,14 @@ class Portal: # message right now since there shouldn't be a reason to # stop and restart the stream, right? try: + + # We are for sure done with this stream and no more + # messages are expected to be delivered from the + # runtime's msg loop. + await recv_chan.aclose() + await ctx.cancel() + except trio.ClosedResourceError: # if the far end terminates before we send a cancel the # underlying transport-channel may already be closed. @@ -337,18 +349,21 @@ class Portal: @asynccontextmanager async def open_context( + self, func: Callable, + cancel_on_exit: bool = False, **kwargs, + ) -> AsyncGenerator[Tuple[Context, Any], None]: - """Open an inter-actor task context. + '''Open an inter-actor task context. This is a synchronous API which allows for deterministic setup/teardown of a remote task. The yielded ``Context`` further - allows for opening bidirectional streams - see - ``Context.open_stream()``. + allows for opening bidirectional streams, explicit cancellation + and synchronized final result collection. See ``tractor.Context``. - """ + ''' # conduct target func method structural checks if not inspect.iscoroutinefunction(func) and ( getattr(func, '_tractor_contex_function', False) @@ -358,8 +373,8 @@ class Portal: fn_mod_path, fn_name = func_deats(func) + recv_chan: Optional[trio.MemoryReceiveChannel] = None - recv_chan: trio.ReceiveMemoryChannel = None try: cid, recv_chan, functype, first_msg = await self._submit( fn_mod_path, fn_name, kwargs) @@ -383,12 +398,37 @@ class Portal: # deliver context instance and .started() msg value in open # tuple. - ctx = Context(self.channel, cid, _portal=self) + ctx = Context( + self.channel, + cid, + _portal=self, + _recv_chan=recv_chan, + ) + try: yield ctx, first - finally: + if cancel_on_exit: + await ctx.cancel() + + else: + if not ctx._cancel_called: + await ctx.result() + + except ContextCancelled: + # if the context was cancelled by client code + # then we don't need to raise since user code + # is expecting this. + if not ctx._cancel_called: + raise + + except BaseException: + # the context cancels itself on any deviation await ctx.cancel() + raise + + finally: + log.info(f'Context for {func.__name__} completed') finally: if recv_chan is not None: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index fb5f8a8..ea01264 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -35,10 +35,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): which invoked a remote streaming function using `Portal.run()`. Termination rules: + - if the local task signals stop iteration a cancel signal is relayed to the remote task indicating to stop streaming - - if the remote task signals the end of a stream, raise a - ``StopAsyncIteration`` to terminate the local ``async for`` + - if the remote task signals the end of a stream, raise + a ``StopAsyncIteration`` to terminate the local ``async for`` """ def __init__( @@ -51,12 +52,19 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self._rx_chan = rx_chan self._shielded = shield + # flag to denote end of stream + self._eoc: bool = False + # delegate directly to underlying mem channel def receive_nowait(self): msg = self._rx_chan.receive_nowait() return msg['yield'] async def receive(self): + # see ``.aclose()`` to an alt to always checking this + if self._eoc: + raise trio.EndOfChannel + try: msg = await self._rx_chan.receive() return msg['yield'] @@ -72,9 +80,14 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if msg.get('stop'): log.debug(f"{self} was stopped at remote end") + self._eoc = True + # when the send is closed we assume the stream has # terminated and signal this local iterator to stop await self.aclose() + + # XXX: this causes ``ReceiveChannel.__anext__()`` to + # raise a ``StopAsyncIteration``. raise trio.EndOfChannel # TODO: test that shows stream raising an expected error!!! @@ -85,7 +98,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): else: raise - except (trio.ClosedResourceError, StopAsyncIteration): + except trio.ClosedResourceError: # XXX: this indicates that a `stop` message was # sent by the far side of the underlying channel. # Currently this is triggered by calling ``.aclose()`` on @@ -108,8 +121,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # terminated and signal this local iterator to stop await self.aclose() - # await self._ctx.send_stop() - raise StopAsyncIteration + raise # propagate except trio.Cancelled: # relay cancels to the remote task @@ -138,12 +150,16 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): on close. """ - # TODO: proper adherance to trio's `.aclose()` semantics: + # XXX: keep proper adherance to trio's `.aclose()` semantics: # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan if rx_chan._closed: log.warning(f"{self} is already closed") + + # this stream has already been closed so silently succeed as + # per ``trio.AsyncResource`` semantics. + # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose return # TODO: broadcasting to multiple consumers @@ -173,12 +189,45 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # `Context.open_stream()` will create the `Actor._cids2qs` # entry from a call to `Actor.get_memchans()`. if not self._ctx._portal: - # only for 2 way streams can we can send - # stop from the caller side - await self._ctx.send_stop() + try: + # only for 2 way streams can we can send + # stop from the caller side + await self._ctx.send_stop() - # close the local mem chan - await rx_chan.aclose() + except trio.BrokenResourceError: + # the underlying channel may already have been pulled + # in which case our stop message is meaningless since + # it can't traverse the transport. + log.debug(f'Channel for {self} was already closed') + + self._eoc = True + + # close the local mem chan??!? + + # NOT if we're a ``MsgStream``! + # BECAUSE this same core-msg-loop mem recv-chan is used to deliver + # the potential final result from the surrounding inter-actor + # `Context` so we don't want to close it until that context has + # run to completion. + + # XXX: Notes on old behaviour. + + # In the receive-only case, ``Portal.open_stream_from()`` should + # call this explicitly on teardown but additionally if for some + # reason stream consumer code tries to manually receive a new + # value before ``.aclose()`` is called **but** the far end has + # stopped `.receive()` **must** raise ``trio.EndofChannel`` in + # order to avoid an infinite hang on ``.__anext__()``. So we can + # instead uncomment this check and close the underlying msg-loop + # mem chan below and not then **not** check for ``self._eoc`` in + # ``.receive()`` (if for some reason we think that check is + # a bottle neck - not likely) such that the + # ``trio.ClosedResourceError`` would instead trigger the + # ``trio.EndOfChannel`` in ``.receive()`` (as it originally was + # before bi-dir streaming support). + + # if not isinstance(self, MsgStream): + # await rx_chan.aclose() # TODO: but make it broadcasting to consumers # def clone(self): @@ -206,29 +255,29 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) -@dataclass(frozen=True) +@dataclass class Context: - """An IAC (inter-actor communication) context. + '''An inter-actor task communication context. - Allows maintaining task or protocol specific state between communicating - actors. A unique context is created on the receiving end for every request - to a remote actor. + Allows maintaining task or protocol specific state between + 2 communicating actor tasks. A unique context is created on the + callee side/end for every request to a remote actor from a portal. - A context can be cancelled and (eventually) restarted from + A context can be cancelled and (possibly eventually restarted) from either side of the underlying IPC channel. - A context can be used to open task oriented message streams. + A context can be used to open task oriented message streams and can + be thought of as an IPC aware inter-actor cancel scope. - """ + ''' chan: Channel cid: str - # TODO: should we have seperate types for caller vs. callee - # side contexts? The caller always opens a portal whereas the callee - # is always responding back through a context-stream - # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa + _recv_chan: Optional[trio.MemoryReceiveChannel] = None + _result: Optional[Any] = False + _cancel_called: bool = False # only set on the callee side _cancel_scope: Optional[trio.CancelScope] = None @@ -247,12 +296,14 @@ class Context: await self.chan.send({'stop': True, 'cid': self.cid}) async def cancel(self) -> None: - """Cancel this inter-actor-task context. + '''Cancel this inter-actor-task context. Request that the far side cancel it's current linked context, - timeout quickly to sidestep 2-generals... + Timeout quickly in an attempt to sidestep 2-generals... + + ''' + self._cancel_called = True - """ if self._portal: # caller side: if not self._portal: raise RuntimeError( @@ -290,17 +341,31 @@ class Context: # https://github.com/goodboy/tractor/issues/36 self._cancel_scope.cancel() - # TODO: do we need a restart api? - # async def restart(self) -> None: - # pass - @asynccontextmanager async def open_stream( + self, shield: bool = False, - ) -> AsyncGenerator[MsgStream, None]: - # TODO + ) -> AsyncGenerator[MsgStream, None]: + '''Open a ``MsgStream``, a bi-directional stream connected to the + cross-actor (far end) task for this ``Context``. + + This context manager must be entered on both the caller and + callee for the stream to logically be considered "connected". + + A ``MsgStream`` is currently "one-shot" use, meaning if you + close it you can not "re-open" it for streaming and instead you + must re-establish a new surrounding ``Context`` using + ``Portal.open_context()``. In the future this may change but + currently there seems to be no obvious reason to support + "re-opening": + - pausing a stream can be done with a message. + - task errors will normally require a restart of the entire + scope of the inter-actor task context due to the nature of + ``trio``'s cancellation system. + + ''' actor = current_actor() # here we create a mem chan that corresponds to the @@ -316,6 +381,19 @@ class Context: self.cid ) + # XXX: If the underlying receive mem chan has been closed then + # likely client code has already exited a ``.open_stream()`` + # block prior. we error here until such a time that we decide + # allowing streams to be "re-connected" is supported and/or + # a good idea. + if recv_chan._closed: + task = trio.lowlevel.current_task().name + raise trio.ClosedResourceError( + f'stream for {actor.uid[0]}:{task} has already been closed.' + '\nRe-opening a closed stream is not yet supported!' + '\nConsider re-calling the containing `@tractor.context` func' + ) + async with MsgStream( ctx=self, rx_chan=recv_chan, @@ -326,19 +404,65 @@ class Context: self._portal._streams.add(rchan) try: + # ensure we aren't cancelled before delivering + # the stream + # await trio.lowlevel.checkpoint() yield rchan except trio.EndOfChannel: + # stream iteration stop signal raise else: - # signal ``StopAsyncIteration`` on far end. + # XXX: Make the stream "one-shot use". On exit, signal + # ``trio.EndOfChannel``/``StopAsyncIteration`` to the + # far end. await self.send_stop() finally: if self._portal: self._portal._streams.remove(rchan) + async def result(self) -> Any: + '''From a caller side, wait for and return the final result from + the callee side task. + + ''' + assert self._portal, "Context.result() can not be called from callee!" + assert self._recv_chan + + if self._result is False: + + if not self._recv_chan._closed: # type: ignore + + # wait for a final context result consuming + # and discarding any bi dir stream msgs still + # in transit from the far end. + while True: + + msg = await self._recv_chan.receive() + try: + self._result = msg['return'] + break + except KeyError: + + if 'yield' in msg: + # far end task is still streaming to us.. + log.warning(f'Remote stream deliverd {msg}') + # do disard + continue + + elif 'stop' in msg: + log.debug('Remote stream terminated') + continue + + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self._portal.channel) + + return self._result + async def started(self, value: Optional[Any] = None) -> None: if self._portal: @@ -347,6 +471,10 @@ class Context: await self.chan.send({'started': value, 'cid': self.cid}) + # TODO: do we need a restart api? + # async def restart(self) -> None: + # pass + def stream(func: Callable) -> Callable: """Mark an async function as a streaming routine with ``@stream``. From 1a69727b75390d118537d463b9925a49043e2128 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 20:19:52 -0400 Subject: [PATCH 20/50] Fix exception typing --- tractor/_exceptions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 07ea4e6..d213a20 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -1,7 +1,7 @@ """ Our classy exception set. """ -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, Type import importlib import builtins import traceback @@ -18,7 +18,7 @@ class RemoteActorError(Exception): def __init__( self, message: str, - suberror_type: Optional[Exception] = None, + suberror_type: Optional[Type[BaseException]] = None, **msgdata ) -> None: @@ -84,7 +84,7 @@ def unpack_error( tb_str = error.get('tb_str', '') message = f"{chan.uid}\n" + tb_str type_name = error['type_str'] - suberror_type = Exception + suberror_type: Type[BaseException] = Exception if type_name == 'ContextCancelled': err_type = ContextCancelled From 54916be6017ff9c6c7be0f1558001fd7f356a595 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 20:21:49 -0400 Subject: [PATCH 21/50] Adjustments for non-frozen context dataclass change --- tractor/msg.py | 48 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/tractor/msg.py b/tractor/msg.py index 560e644..28e3405 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -1,9 +1,13 @@ """ Messaging pattern APIs and helpers. + +NOTE: this module is likely deprecated by the new bi-directional streaming +support provided by ``tractor.Context.open_stream()`` and friends. + """ import inspect import typing -from typing import Dict, Any, Set, Callable +from typing import Dict, Any, Set, Callable, List, Tuple from functools import partial from async_generator import aclosing @@ -20,7 +24,7 @@ log = get_logger('messaging') async def fan_out_to_ctxs( pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy - topics2ctxs: Dict[str, set], + topics2ctxs: Dict[str, list], packetizer: typing.Callable = None, ) -> None: """Request and fan out quotes to each subscribed actor channel. @@ -34,24 +38,27 @@ async def fan_out_to_ctxs( async for published in pub_gen: - ctx_payloads: Dict[str, Any] = {} + ctx_payloads: List[Tuple[Context, Any]] = [] for topic, data in published.items(): log.debug(f"publishing {topic, data}") + # build a new dict packet or invoke provided packetizer if packetizer is None: packet = {topic: data} + else: packet = packetizer(topic, data) - for ctx in topics2ctxs.get(topic, set()): - ctx_payloads.setdefault(ctx, {}).update(packet), + + for ctx in topics2ctxs.get(topic, list()): + ctx_payloads.append((ctx, packet)) if not ctx_payloads: log.debug(f"Unconsumed values:\n{published}") # deliver to each subscriber (fan out) if ctx_payloads: - for ctx, payload in ctx_payloads.items(): + for ctx, payload in ctx_payloads: try: await ctx.send_yield(payload) except ( @@ -60,15 +67,24 @@ async def fan_out_to_ctxs( ConnectionRefusedError, ): log.warning(f"{ctx.chan} went down?") - for ctx_set in topics2ctxs.values(): - ctx_set.discard(ctx) + for ctx_list in topics2ctxs.values(): + try: + ctx_list.remove(ctx) + except ValueError: + continue if not get_topics(): log.warning(f"No subscribers left for {pub_gen}") break -def modify_subs(topics2ctxs, topics, ctx): +def modify_subs( + + topics2ctxs: Dict[str, List[Context]], + topics: Set[str], + ctx: Context, + +) -> None: """Absolute symbol subscription list for each quote stream. Effectively a symbol subscription api. @@ -77,7 +93,7 @@ def modify_subs(topics2ctxs, topics, ctx): # update map from each symbol to requesting client's chan for topic in topics: - topics2ctxs.setdefault(topic, set()).add(ctx) + topics2ctxs.setdefault(topic, list()).append(ctx) # remove any existing symbol subscriptions if symbol is not # found in ``symbols`` @@ -85,10 +101,14 @@ def modify_subs(topics2ctxs, topics, ctx): for topic in filter( lambda topic: topic not in topics, topics2ctxs.copy() ): - ctx_set = topics2ctxs.get(topic) - ctx_set.discard(ctx) + ctx_list = topics2ctxs.get(topic) + if ctx_list: + try: + ctx_list.remove(ctx) + except ValueError: + pass - if not ctx_set: + if not ctx_list: # pop empty sets which will trigger bg quoter task termination topics2ctxs.pop(topic) @@ -256,7 +276,7 @@ def pub( respawn = True finally: # remove all subs for this context - modify_subs(topics2ctxs, (), ctx) + modify_subs(topics2ctxs, set(), ctx) # if there are truly no more subscriptions with this broker # drop from broker subs dict From 196dea80db6f703b3a43ca4eace86927250b5381 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 20:22:26 -0400 Subject: [PATCH 22/50] Drop trailing comma --- tractor/testing/_tractor_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 734e367..ebb6f32 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -78,7 +78,7 @@ def tractor_test(fn): else: # use implicit root actor start - main = partial(fn, *args, **kwargs), + main = partial(fn, *args, **kwargs) return trio.run(main) # arbiter_addr=arb_addr, From 7c5fd8ce9f38fcb1f6eb7077384de2c056e8cfac Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 20:25:36 -0400 Subject: [PATCH 23/50] Add detailed ``@tractor.context`` cancellation/termination tests --- tests/test_2way.py | 320 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 303 insertions(+), 17 deletions(-) diff --git a/tests/test_2way.py b/tests/test_2way.py index fec0cae..716e2ce 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -6,20 +6,13 @@ import pytest import trio import tractor -# from conftest import tractor_test +from conftest import tractor_test -# TODO: test endofchannel semantics / cancellation / error cases: -# 3 possible outcomes: -# - normal termination: far end relays a stop message with -# final value as in async gen from ``return ``. - -# possible outcomes: -# - normal termination: far end returns -# - premature close: far end relays a stop message to tear down stream -# - cancel: far end raises `ContextCancelled` - -# future possible outcomes -# - restart request: far end raises `ContextRestart` +# the general stream semantics are +# - normal termination: far end relays a stop message which +# terminates an ongoing ``MsgStream`` iteration +# - cancel termination: context is cancelled on either side cancelling +# the "linked" inter-actor task context _state: bool = False @@ -30,6 +23,7 @@ async def simple_setup_teardown( ctx: tractor.Context, data: int, + block_forever: bool = False, ) -> None: @@ -41,8 +35,11 @@ async def simple_setup_teardown( await ctx.started(data + 1) try: - # block until cancelled - await trio.sleep_forever() + if block_forever: + # block until cancelled + await trio.sleep_forever() + else: + return 'yo' finally: _state = False @@ -56,7 +53,14 @@ async def assert_state(value: bool): 'error_parent', [False, True], ) -def test_simple_context(error_parent): +@pytest.mark.parametrize( + 'callee_blocks_forever', + [False, True], +) +def test_simple_context( + error_parent, + callee_blocks_forever, +): async def main(): @@ -70,11 +74,16 @@ def test_simple_context(error_parent): async with portal.open_context( simple_setup_teardown, data=10, + block_forever=callee_blocks_forever, ) as (ctx, sent): assert sent == 11 - await portal.run(assert_state, value=True) + if callee_blocks_forever: + await portal.run(assert_state, value=True) + await ctx.cancel() + else: + assert await ctx.result() == 'yo' # after cancellation await portal.run(assert_state, value=False) @@ -94,6 +103,281 @@ def test_simple_context(error_parent): trio.run(main) +# basic stream terminations: +# - callee context closes without using stream +# - caller context closes without using stream +# - caller context calls `Context.cancel()` while streaming +# is ongoing resulting in callee being cancelled +# - callee calls `Context.cancel()` while streaming and caller +# sees stream terminated in `RemoteActorError` + +# TODO: future possible features +# - restart request: far end raises `ContextRestart` + + +@tractor.context +async def close_ctx_immediately( + + ctx: tractor.Context, + +) -> None: + + await ctx.started() + global _state + + async with ctx.open_stream(): + pass + + +@tractor_test +async def test_callee_closes_ctx_after_stream_open(): + 'callee context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'fast_stream_closer', + enable_modules=[__name__], + ) + + async with portal.open_context( + close_ctx_immediately, + + # flag to avoid waiting the final result + # cancel_on_exit=True, + + ) as (ctx, sent): + + assert sent is None + + with trio.fail_after(0.5): + async with ctx.open_stream() as stream: + + # should fall through since ``StopAsyncIteration`` + # should be raised through translation of + # a ``trio.EndOfChannel`` by + # ``trio.abc.ReceiveChannel.__anext__()`` + async for _ in stream: + assert 0 + else: + + # verify stream is now closed + try: + await stream.receive() + except trio.EndOfChannel: + pass + + # TODO: should be just raise the closed resource err + # directly here to enforce not allowing a re-open + # of a stream to the context (at least until a time of + # if/when we decide that's a good idea?) + try: + async with ctx.open_stream() as stream: + pass + except trio.ClosedResourceError: + pass + + await portal.cancel_actor() + + +@tractor.context +async def expect_cancelled( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.started() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + await stream.send(msg) # echo server + + except trio.Cancelled: + # expected case + _state = False + raise + + else: + assert 0, "Wasn't cancelled!?" + + +@pytest.mark.parametrize( + 'use_ctx_cancel_method', + [False, True], +) +@tractor_test +async def test_caller_closes_ctx_after_callee_opens_stream( + use_ctx_cancel_method: bool, +): + 'caller context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + await portal.run(assert_state, value=True) + + assert sent is None + + # call cancel explicitly + if use_ctx_cancel_method: + + await ctx.cancel() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + else: + assert 0, "Should have context cancelled?" + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + else: + try: + with trio.fail_after(0.2): + await ctx.result() + assert 0, "Callee should have blocked!?" + except trio.TooSlowError: + await ctx.cancel() + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + except trio.ClosedResourceError: + pass + else: + assert 0, "Should have received closed resource error?" + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor_test +async def test_multitask_caller_cancels_from_nonroot_task(): + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + + await portal.run(assert_state, value=True) + assert sent is None + + async with ctx.open_stream() as stream: + + async def send_msg_then_cancel(): + await stream.send('yo') + await portal.run(assert_state, value=True) + await ctx.cancel() + await portal.run(assert_state, value=False) + + async with trio.open_nursery() as n: + n.start_soon(send_msg_then_cancel) + + try: + async for msg in stream: + assert msg == 'yo' + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor.context +async def cancel_self( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.cancel() + try: + with trio.fail_after(0.1): + await trio.sleep_forever() + + except trio.Cancelled: + raise + + except trio.TooSlowError: + # should never get here + assert 0 + + +@tractor_test +async def test_callee_cancels_before_started(): + '''callee calls `Context.cancel()` while streaming and caller + sees stream terminated in `ContextCancelled`. + + ''' + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'cancels_self', + enable_modules=[__name__], + ) + try: + + async with portal.open_context( + cancel_self, + ) as (ctx, sent): + async with ctx.open_stream(): + + await trio.sleep_forever() + + # raises a special cancel signal + except tractor.ContextCancelled as ce: + ce.type == trio.Cancelled + + # teardown the actor + await portal.cancel_actor() + + @tractor.context async def simple_rpc( @@ -207,6 +491,8 @@ def test_simple_rpc(server_func, use_async_for): # stream should terminate here + # final context result(s) should be consumed here in __aexit__() + await portal.cancel_actor() trio.run(main) From 349d82d182949c857f0bf7c07aa3622e976af4a0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 20:26:01 -0400 Subject: [PATCH 24/50] Speedup the dynamic pubsub test --- tests/test_advanced_streaming.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index b5cf2b9..a4d36a9 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -33,9 +33,9 @@ async def publisher( for sub_stream in _registry[sub]: await sub_stream.send(val) - # throttle send rate to ~4Hz + # throttle send rate to ~1kHz # making it readable to a human user - await trio.sleep(1/4) + await trio.sleep(1/1000) @tractor.context @@ -133,7 +133,7 @@ def test_dynamic_pub_sub(): ) # block until cancelled by user - with trio.fail_after(10): + with trio.fail_after(3): await trio.sleep_forever() try: From 8eb889a7455b23b120dc6c016424e3e6847469f6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 20:26:41 -0400 Subject: [PATCH 25/50] Modernize streaming tests --- tests/test_streaming.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 43acb95..8c8d07e 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -32,13 +32,16 @@ async def async_gen_stream(sequence): # block indefinitely waiting to be cancelled by ``aclose()`` call with trio.CancelScope() as cs: - await trio.sleep(float('inf')) + await trio.sleep_forever() assert 0 assert cs.cancelled_caught @tractor.stream -async def context_stream(ctx, sequence): +async def context_stream( + ctx: tractor.Context, + sequence +): for i in sequence: await ctx.send_yield(i) await trio.sleep(0.1) From 3d633408fce91013ab5bfdd999be029170a1476a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jun 2021 13:30:09 -0400 Subject: [PATCH 26/50] Don't clobber msg loop mem chan on rx stream close Revert this change since it really is poking at internals and doesn't make a lot of sense. If the context is going to be cancelled then the msg loop will tear down the feed memory channel when ready, we don't need to be clobbering it and confusing the runtime machinery lol. --- tractor/_portal.py | 6 ------ tractor/_streaming.py | 7 +++---- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 32e71b0..3d1bd9c 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -332,12 +332,6 @@ class Portal: # message right now since there shouldn't be a reason to # stop and restart the stream, right? try: - - # We are for sure done with this stream and no more - # messages are expected to be delivered from the - # runtime's msg loop. - await recv_chan.aclose() - await ctx.cancel() except trio.ClosedResourceError: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index ea01264..0d6b59d 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -61,7 +61,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): return msg['yield'] async def receive(self): - # see ``.aclose()`` to an alt to always checking this + # see ``.aclose()`` for an alt to always checking this if self._eoc: raise trio.EndOfChannel @@ -80,7 +80,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if msg.get('stop'): log.debug(f"{self} was stopped at remote end") - self._eoc = True # when the send is closed we assume the stream has # terminated and signal this local iterator to stop @@ -150,6 +149,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): on close. """ + self._eoc = True + # XXX: keep proper adherance to trio's `.aclose()` semantics: # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan @@ -200,8 +201,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # it can't traverse the transport. log.debug(f'Channel for {self} was already closed') - self._eoc = True - # close the local mem chan??!? # NOT if we're a ``MsgStream``! From 1703171bead5e3d94f2ac2fbfd3eb7ebbe4c38ff Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jun 2021 16:34:44 -0400 Subject: [PATCH 27/50] Set stream "end of channel" after shielded check! Another face palm that was causing serious issues for code that is using the `.shielded` feature.. Add a bunch more detailed comments for all this subtlety and hopefully get it right once and for all. Also aggregated the `trio` errors that should trigger closure inside `.aclose()`, hopefully that's right too. --- tractor/_streaming.py | 122 ++++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 46 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 0d6b59d..6b544d0 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -61,7 +61,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): return msg['yield'] async def receive(self): - # see ``.aclose()`` for an alt to always checking this + # see ``.aclose()`` for notes on the old behaviour prior to + # introducing this if self._eoc: raise trio.EndOfChannel @@ -81,12 +82,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if msg.get('stop'): log.debug(f"{self} was stopped at remote end") - # when the send is closed we assume the stream has - # terminated and signal this local iterator to stop - await self.aclose() + # # when the send is closed we assume the stream has + # # terminated and signal this local iterator to stop + # await self.aclose() # XXX: this causes ``ReceiveChannel.__anext__()`` to - # raise a ``StopAsyncIteration``. + # raise a ``StopAsyncIteration`` **and** in our catch + # block below it will trigger ``.aclose()``. raise trio.EndOfChannel # TODO: test that shows stream raising an expected error!!! @@ -97,24 +99,34 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): else: raise - except trio.ClosedResourceError: - # XXX: this indicates that a `stop` message was - # sent by the far side of the underlying channel. - # Currently this is triggered by calling ``.aclose()`` on + except ( + trio.ClosedResourceError, # by self._rx_chan + trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end + trio.Cancelled, # by local cancellation + ): + # XXX: we close the stream on any of these error conditions: + + # a ``ClosedResourceError`` indicates that the internal + # feeder memory receive channel was closed likely by the + # runtime after the associated transport-channel + # disconnected or broke. + + # an ``EndOfChannel`` indicates either the internal recv + # memchan exhausted **or** we raisesd it just above after + # receiving a `stop` message from the far end of the stream. + + # Previously this was triggered by calling ``.aclose()`` on # the send side of the channel inside - # ``Actor._push_result()``, but maybe it should be put here? - # to avoid exposing the internal mem chan closing mechanism? - # in theory we could instead do some flushing of the channel - # if needed to ensure all consumers are complete before - # triggering closure too early? + # ``Actor._push_result()`` (should still be commented code + # there - which should eventually get removed), but now the + # 'stop' message handling has been put just above. - # Locally, we want to close this stream gracefully, by + # TODO: Locally, we want to close this stream gracefully, by # terminating any local consumers tasks deterministically. - # We **don't** want to be closing this send channel and not - # relaying a final value to remaining consumers who may not - # have been scheduled to receive it yet? - - # lots of testing to do here + # One we have broadcast support, we **don't** want to be + # closing this stream and not flushing a final value to + # remaining (clone) consumers who may not have been + # scheduled to receive it yet. # when the send is closed we assume the stream has # terminated and signal this local iterator to stop @@ -122,10 +134,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise # propagate - except trio.Cancelled: - # relay cancels to the remote task - await self.aclose() - raise + # except trio.Cancelled: + # # relay cancels to the remote task + # await self.aclose() + # raise @contextmanager def shield( @@ -149,8 +161,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): on close. """ - self._eoc = True - # XXX: keep proper adherance to trio's `.aclose()` semantics: # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan @@ -175,6 +185,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): log.warning(f"{self} is shielded, portal channel being kept alive") return + # XXX: This must be set **AFTER** the shielded test above! + self._eoc = True + # NOTE: this is super subtle IPC messaging stuff: # Relay stop iteration to far end **iff** we're # in bidirectional mode. If we're only streaming @@ -186,9 +199,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # isn't expecting messages to be sent by the caller. # Thus, we must check that this context DOES NOT # have a portal reference to ensure this is indeed the callee - # side and can relay a 'stop'. In the bidirectional case, - # `Context.open_stream()` will create the `Actor._cids2qs` - # entry from a call to `Actor.get_memchans()`. + # side and can relay a 'stop'. + + # In the bidirectional case, `Context.open_stream()` will create + # the `Actor._cids2qs` entry from a call to + # `Actor.get_memchans()` and will send the stop message in + # ``__aexit__()`` on teardown so it **does not** need to be + # called here. if not self._ctx._portal: try: # only for 2 way streams can we can send @@ -201,32 +218,41 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # it can't traverse the transport. log.debug(f'Channel for {self} was already closed') - # close the local mem chan??!? + # close the local mem chan ``self._rx_chan`` ??!? - # NOT if we're a ``MsgStream``! + # DEFINITELY NOT if we're a bi-dir ``MsgStream``! # BECAUSE this same core-msg-loop mem recv-chan is used to deliver # the potential final result from the surrounding inter-actor # `Context` so we don't want to close it until that context has # run to completion. - # XXX: Notes on old behaviour. + # XXX: Notes on old behaviour: + # await rx_chan.aclose() - # In the receive-only case, ``Portal.open_stream_from()`` should - # call this explicitly on teardown but additionally if for some - # reason stream consumer code tries to manually receive a new + # In the receive-only case, ``Portal.open_stream_from()`` used + # to rely on this call explicitly on teardown such that a new + # call to ``.receive()`` after ``rx_chan`` had been closed, would + # result in us raising a ``trio.EndOfChannel`` (since we + # remapped the ``trio.ClosedResourceError`). However, now if for some + # reason the stream's consumer code tries to manually receive a new # value before ``.aclose()`` is called **but** the far end has # stopped `.receive()` **must** raise ``trio.EndofChannel`` in - # order to avoid an infinite hang on ``.__anext__()``. So we can - # instead uncomment this check and close the underlying msg-loop - # mem chan below and not then **not** check for ``self._eoc`` in - # ``.receive()`` (if for some reason we think that check is - # a bottle neck - not likely) such that the - # ``trio.ClosedResourceError`` would instead trigger the - # ``trio.EndOfChannel`` in ``.receive()`` (as it originally was - # before bi-dir streaming support). + # order to avoid an infinite hang on ``.__anext__()``; this is + # why we added ``self._eoc`` to denote stream closure indepedent + # of ``rx_chan``. - # if not isinstance(self, MsgStream): - # await rx_chan.aclose() + # In theory we could still use this old method and close the + # underlying msg-loop mem chan as above and then **not** check + # for ``self._eoc`` in ``.receive()`` (if for some reason we + # think that check is a bottle neck - not likely) **but** then + # we would need to map the resulting + # ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in + # ``.receive()`` (as it originally was before bi-dir streaming + # support) in order to trigger stream closure. The old behaviour + # is arguably more confusing since we lose detection of the + # runtime's closure of ``rx_chan`` in the case where we may + # still need to consume msgs that are "in transit" from the far + # end (eg. for ``Context.result()``). # TODO: but make it broadcasting to consumers # def clone(self): @@ -251,6 +277,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): self, data: Any ) -> None: + '''Send a message over this stream to the far end. + + ''' await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -409,7 +438,8 @@ class Context: yield rchan except trio.EndOfChannel: - # stream iteration stop signal + # likely the far end sent us a 'stop' message to + # terminate the stream. raise else: From af701c16ee2484c5bcbd051fe4179c737713b5c3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Jun 2021 19:56:05 -0400 Subject: [PATCH 28/50] Consider relaying context error via raised-in-scope-nursery task --- tractor/_actor.py | 76 ++++++++++++++++++++------ tractor/_portal.py | 123 ++++++++++++++++++++++++------------------ tractor/_streaming.py | 59 +++++++++++++------- 3 files changed, 172 insertions(+), 86 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 4b215f8..f4711fb 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -46,6 +46,7 @@ class ActorFailure(Exception): async def _invoke( + actor: 'Actor', cid: str, chan: Channel, @@ -58,10 +59,12 @@ async def _invoke( """Invoke local func and deliver result(s) over provided channel. """ treat_as_gen = False - cs = None + cancel_scope = trio.CancelScope() - ctx = Context(chan, cid, _cancel_scope=cancel_scope) - context = False + cs: trio.CancelScope = None + + ctx = Context(chan, cid) + context: bool = False if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions @@ -149,14 +152,22 @@ async def _invoke( # context func with support for bi-dir streaming await chan.send({'functype': 'context', 'cid': cid}) - with cancel_scope as cs: + async with trio.open_nursery() as scope_nursery: + ctx._scope_nursery = scope_nursery + cs = scope_nursery.cancel_scope task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) if cs.cancelled_caught: + if ctx._cancel_called: + msg = f'{func.__name__} cancelled itself', + + else: + msg = f'{func.__name__} was remotely cancelled', + # task-contex was cancelled so relay to the cancel to caller raise ContextCancelled( - f'{func.__name__} cancelled itself', + msg, suberror_type=trio.Cancelled, ) @@ -191,8 +202,10 @@ async def _invoke( await chan.send(err_msg) except trio.ClosedResourceError: - log.warning( - f"Failed to ship error to caller @ {chan.uid}") + # if we can't propagate the error that's a big boo boo + log.error( + f"Failed to ship error to caller @ {chan.uid} !?" + ) if cs is None: # error is from above code not from rpc invocation @@ -372,12 +385,16 @@ class Actor: raise mne async def _stream_handler( + self, stream: trio.SocketStream, + ) -> None: """Entry point for new inbound connections to the channel server. + """ self._no_more_peers = trio.Event() # unset + chan = Channel(stream=stream) log.info(f"New connection to us {chan}") @@ -423,10 +440,24 @@ class Actor: try: await self._process_messages(chan) finally: + + # channel cleanup sequence + + # for (channel, cid) in self._rpc_tasks.copy(): + # if channel is chan: + # with trio.CancelScope(shield=True): + # await self._cancel_task(cid, channel) + + # # close all consumer side task mem chans + # send_chan, _ = self._cids2qs[(chan.uid, cid)] + # assert send_chan.cid == cid # type: ignore + # await send_chan.aclose() + # Drop ref to channel so it can be gc-ed and disconnected log.debug(f"Releasing channel {chan} from {chan.uid}") chans = self._peers.get(chan.uid) chans.remove(chan) + if not chans: log.debug(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) @@ -439,14 +470,22 @@ class Actor: # # XXX: is this necessary (GC should do it?) if chan.connected(): + # if the channel is still connected it may mean the far + # end has not closed and we may have gotten here due to + # an error and so we should at least try to terminate + # the channel from this end gracefully. + log.debug(f"Disconnecting channel {chan}") try: - # send our msg loop terminate sentinel + # send a msg loop terminate sentinel await chan.send(None) + + # XXX: do we want this? + # causes "[104] connection reset by peer" on other end # await chan.aclose() + except trio.BrokenResourceError: - log.exception( - f"Channel for {chan.uid} was already zonked..") + log.warning(f"Channel for {chan.uid} was already closed") async def _push_result( self, @@ -456,18 +495,22 @@ class Actor: ) -> None: """Push an RPC result to the local consumer's queue. """ - actorid = chan.uid - assert actorid, f"`actorid` can't be {actorid}" - send_chan, recv_chan = self._cids2qs[(actorid, cid)] + # actorid = chan.uid + assert chan.uid, f"`chan.uid` can't be {chan.uid}" + send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] assert send_chan.cid == cid # type: ignore - # if 'stop' in msg: + if 'error' in msg: + ctx = getattr(recv_chan, '_ctx', None) + # if ctx: + # ctx._error_from_remote_msg(msg) + # log.debug(f"{send_chan} was terminated at remote end") # # indicate to consumer that far end has stopped # return await send_chan.aclose() try: - log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}") # maintain backpressure await send_chan.send(msg) @@ -486,7 +529,9 @@ class Actor: self, actorid: Tuple[str, str], cid: str + ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: + log.debug(f"Getting result queue for {actorid} cid {cid}") try: send_chan, recv_chan = self._cids2qs[(actorid, cid)] @@ -548,6 +593,7 @@ class Actor: log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") + break log.trace( # type: ignore diff --git a/tractor/_portal.py b/tractor/_portal.py index 3d1bd9c..ed5892d 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -177,6 +177,7 @@ class Portal: f"Cancelling all streams with {self.channel.uid}") for stream in self._streams.copy(): try: + # with trio.CancelScope(shield=True): await stream.aclose() except trio.ClosedResourceError: # don't error the stream having already been closed @@ -369,64 +370,78 @@ class Portal: recv_chan: Optional[trio.MemoryReceiveChannel] = None + cid, recv_chan, functype, first_msg = await self._submit( + fn_mod_path, fn_name, kwargs) + + assert functype == 'context' + msg = await recv_chan.receive() + try: - cid, recv_chan, functype, first_msg = await self._submit( - fn_mod_path, fn_name, kwargs) + # the "first" value here is delivered by the callee's + # ``Context.started()`` call. + first = msg['started'] - assert functype == 'context' - msg = await recv_chan.receive() + except KeyError: + assert msg.get('cid'), ("Received internal error at context?") - try: - # the "first" value here is delivered by the callee's - # ``Context.started()`` call. - first = msg['started'] - - except KeyError: - assert msg.get('cid'), ("Received internal error at context?") - - if msg.get('error'): - # raise the error message - raise unpack_error(msg, self.channel) - else: - raise - - # deliver context instance and .started() msg value in open - # tuple. - ctx = Context( - self.channel, - cid, - _portal=self, - _recv_chan=recv_chan, - ) - - try: - yield ctx, first - - if cancel_on_exit: - await ctx.cancel() - - else: - if not ctx._cancel_called: - await ctx.result() - - except ContextCancelled: - # if the context was cancelled by client code - # then we don't need to raise since user code - # is expecting this. - if not ctx._cancel_called: - raise - - except BaseException: - # the context cancels itself on any deviation - await ctx.cancel() + if msg.get('error'): + # raise the error message + raise unpack_error(msg, self.channel) + else: raise - finally: - log.info(f'Context for {func.__name__} completed') + # deliver context instance and .started() msg value in open + # tuple. + try: + async with trio.open_nursery() as scope_nursery: + ctx = Context( + self.channel, + cid, + _portal=self, + _recv_chan=recv_chan, + _scope_nursery=scope_nursery, + ) + recv_chan._ctx = ctx + + yield ctx, first + + log.info(f'Context for {func.__name__} completed') + + if cancel_on_exit: + await ctx.cancel() + + else: + if not ctx._cancel_called: + await ctx.result() + + await recv_chan.aclose() + + # except TypeError: + # # if fn_name == '_emsd_main': + # import tractor + # await tractor.breakpoint() + + except ContextCancelled: + if not ctx._cancel_called: + raise + + # if the context was cancelled by client code + # then we don't need to raise since user code + # is expecting this and the block should exit. + else: + log.debug(f'Context {ctx} cancelled gracefully') + + except trio.Cancelled: + # the context cancels itself on any deviation + await ctx.cancel() + raise + + # finally: + # log.info(f'Context for {func.__name__} completed') + + # finally: + # if recv_chan is not None: - finally: - if recv_chan is not None: - await recv_chan.aclose() @dataclass class LocalPortal: @@ -464,6 +479,7 @@ async def open_portal( was_connected = False async with maybe_open_nursery(nursery, shield=shield) as nursery: + if not channel.connected(): await channel.connect() was_connected = True @@ -485,11 +501,12 @@ async def open_portal( portal = Portal(channel) try: yield portal + finally: await portal.aclose() if was_connected: - # cancel remote channel-msg loop + # gracefully signal remote channel-msg loop await channel.send(None) # cancel background msg loop task diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 6b544d0..beb5eb2 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager from dataclasses import dataclass from typing import ( Any, Iterator, Optional, Callable, - AsyncGenerator, + AsyncGenerator, Dict, ) import warnings @@ -67,6 +67,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise trio.EndOfChannel try: + # if self._ctx.chan.uid[0] == 'brokerd.ib': + # breakpoint() + msg = await self._rx_chan.receive() return msg['yield'] @@ -134,6 +137,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise # propagate + # except trio.Cancelled: + # if not self._shielded: + # # if shielded we don't propagate a cancelled + # raise + # except trio.Cancelled: # # relay cancels to the remote task # await self.aclose() @@ -165,7 +173,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan - if rx_chan._closed: + if rx_chan._closed: # or self._eoc: log.warning(f"{self} is already closed") # this stream has already been closed so silently succeed as @@ -212,7 +220,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # stop from the caller side await self._ctx.send_stop() - except trio.BrokenResourceError: + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): # the underlying channel may already have been pulled # in which case our stop message is meaningless since # it can't traverse the transport. @@ -254,18 +265,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). - # TODO: but make it broadcasting to consumers - # def clone(self): - # """Clone this receive channel allowing for multi-task - # consumption from the same channel. - - # """ - # return ReceiveStream( - # self._cid, - # self._rx_chan.clone(), - # self._portal, - # ) - class MsgStream(ReceiveMsgStream, trio.abc.Channel): """ @@ -282,6 +281,17 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) + # TODO: but make it broadcasting to consumers + def clone(self): + """Clone this receive channel allowing for multi-task + consumption from the same channel. + + """ + return MsgStream( + self._ctx, + self._rx_chan.clone(), + ) + @dataclass class Context: @@ -308,7 +318,7 @@ class Context: _cancel_called: bool = False # only set on the callee side - _cancel_scope: Optional[trio.CancelScope] = None + _scope_nursery: Optional[trio.Nursery] = None async def send_yield(self, data: Any) -> None: @@ -323,6 +333,16 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) + def _error_from_remote_msg( + self, + msg: Dict[str, Any], + + ) -> None: + async def raiser(): + raise unpack_error(msg, self.chan) + + self._scope_nursery.start_soon(raiser) + async def cancel(self) -> None: '''Cancel this inter-actor-task context. @@ -361,13 +381,16 @@ class Context: f"{cid} for {self._portal.channel.uid}") else: # ensure callee side - assert self._cancel_scope + assert self._scope_nursery # TODO: should we have an explicit cancel message # or is relaying the local `trio.Cancelled` as an # {'error': trio.Cancelled, cid: "blah"} enough? # This probably gets into the discussion in # https://github.com/goodboy/tractor/issues/36 - self._cancel_scope.cancel() + self._scope_nursery.cancel_scope.cancel() + + if self._recv_chan: + await self._recv_chan.aclose() @asynccontextmanager async def open_stream( From 3423ea40111bb1d907522661e6af34fa6202829c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 27 Jun 2021 00:46:36 -0400 Subject: [PATCH 29/50] Add temp warning msg for context cancel call --- tractor/_streaming.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index beb5eb2..976180f 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -350,6 +350,8 @@ class Context: Timeout quickly in an attempt to sidestep 2-generals... ''' + log.warning(f'Cancelling caller side of context {self}') + self._cancel_called = True if self._portal: # caller side: From c2484e88a1ab73de8729c1a2bdb640a8a485d416 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 27 Jun 2021 11:37:35 -0400 Subject: [PATCH 30/50] First try: pack cancelled tracebacks and ship to caller --- tractor/_actor.py | 49 +++++++++++++++++++++++++++++------------- tractor/_exceptions.py | 13 +++++++++-- 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index f4711fb..b514d8c 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -60,6 +60,9 @@ async def _invoke( """ treat_as_gen = False + # possible a traceback (not sure what typing is for this..) + tb = None + cancel_scope = trio.CancelScope() cs: trio.CancelScope = None @@ -156,14 +159,26 @@ async def _invoke( ctx._scope_nursery = scope_nursery cs = scope_nursery.cancel_scope task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + try: + await chan.send({'return': await coro, 'cid': cid}) + except trio.Cancelled as err: + tb = err.__traceback__ if cs.cancelled_caught: - if ctx._cancel_called: - msg = f'{func.__name__} cancelled itself', - else: - msg = f'{func.__name__} was remotely cancelled', + # TODO: pack in ``trio.Cancelled.__traceback__`` here + # so they can be unwrapped and displayed on the caller + # side! + + fname = func.__name__ + if ctx._cancel_called: + msg = f'{fname} cancelled itself' + + elif cs.cancel_called: + msg = ( + f'{fname} was remotely cancelled by its caller ' + f'{ctx.chan.uid}' + ) # task-contex was cancelled so relay to the cancel to caller raise ContextCancelled( @@ -196,7 +211,7 @@ async def _invoke( log.exception("Actor crashed:") # always ship errors back to caller - err_msg = pack_error(err) + err_msg = pack_error(err, tb=tb) err_msg['cid'] = cid try: await chan.send(err_msg) @@ -223,7 +238,7 @@ async def _invoke( f"Task {func} likely errored or cancelled before it started") finally: if not actor._rpc_tasks: - log.info("All RPC tasks have completed") + log.runtime("All RPC tasks have completed") actor._ongoing_rpc_tasks.set() @@ -238,10 +253,10 @@ _lifetime_stack: ExitStack = ExitStack() class Actor: """The fundamental concurrency primitive. - An *actor* is the combination of a regular Python or - ``multiprocessing.Process`` executing a ``trio`` task tree, communicating + An *actor* is the combination of a regular Python process + executing a ``trio`` task tree, communicating with other actors through "portals" which provide a native async API - around "channels". + around various IPC transport "channels". """ is_arbiter: bool = False @@ -396,7 +411,7 @@ class Actor: self._no_more_peers = trio.Event() # unset chan = Channel(stream=stream) - log.info(f"New connection to us {chan}") + log.runtime(f"New connection to us {chan}") # send/receive initial handshake response try: @@ -427,8 +442,12 @@ class Actor: event.set() chans = self._peers[uid] + + # TODO: re-use channels for new connections instead + # of always new ones; will require changing all the + # discovery funcs if chans: - log.warning( + log.runtime( f"already have channel(s) for {uid}:{chans}?" ) log.trace(f"Registered {chan} for {uid}") # type: ignore @@ -664,7 +683,7 @@ class Actor: else: # mark that we have ongoing rpc tasks self._ongoing_rpc_tasks = trio.Event() - log.info(f"RPC func is {func}") + log.runtime(f"RPC func is {func}") # store cancel scope such that the rpc task can be # cancelled gracefully if requested self._rpc_tasks[(chan, cid)] = ( @@ -673,7 +692,7 @@ class Actor: # self.cancel() was called so kill this msg loop # and break out into ``_async_main()`` log.warning( - f"{self.uid} was remotely cancelled; " + f"Actor {self.uid} was remotely cancelled; " "waiting on cancellation completion..") await self._cancel_complete.wait() loop_cs.cancel() @@ -1141,7 +1160,7 @@ class Actor: raise ValueError(f"{uid} is not a valid uid?!") chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index d213a20..30c872b 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -56,13 +56,22 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" -def pack_error(exc: BaseException) -> Dict[str, Any]: +def pack_error( + exc: BaseException, + tb = None, + +) -> Dict[str, Any]: """Create an "error message" for tranmission over a channel (aka the wire). """ + if tb: + tb_str = ''.join(traceback.format_tb(tb)) + else: + tb_str = traceback.format_exc() + return { 'error': { - 'tb_str': traceback.format_exc(), + 'tb_str': tb_str, 'type_str': type(exc).__name__, } } From 91640facbccc09325ed2c6eb004d17c08d5d0dd0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Jun 2021 00:18:28 -0400 Subject: [PATCH 31/50] Always shield cancel the caller on cancel-causing-errors, add teardown logging --- tractor/_portal.py | 72 +++++++++++++++++++++++++++++-------------- tractor/_streaming.py | 20 +++++++----- 2 files changed, 62 insertions(+), 30 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index ed5892d..4471c8f 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -295,6 +295,7 @@ class Portal: self, async_gen_func: Callable, # typing: ignore **kwargs, + ) -> AsyncGenerator[ReceiveMsgStream, None]: if not inspect.isasyncgenfunction(async_gen_func): @@ -347,7 +348,6 @@ class Portal: self, func: Callable, - cancel_on_exit: bool = False, **kwargs, ) -> AsyncGenerator[Tuple[Context, Any], None]: @@ -359,6 +359,7 @@ class Portal: and synchronized final result collection. See ``tractor.Context``. ''' + # conduct target func method structural checks if not inspect.iscoroutinefunction(func) and ( getattr(func, '_tractor_contex_function', False) @@ -390,6 +391,7 @@ class Portal: else: raise + _err = None # deliver context instance and .started() msg value in open # tuple. try: @@ -403,26 +405,20 @@ class Portal: ) recv_chan._ctx = ctx + # await trio.lowlevel.checkpoint() yield ctx, first - log.info(f'Context for {func.__name__} completed') + # if not ctx._cancel_called: + # await ctx.result() - if cancel_on_exit: - await ctx.cancel() + # await recv_chan.aclose() - else: - if not ctx._cancel_called: - await ctx.result() - - await recv_chan.aclose() - - # except TypeError: - # # if fn_name == '_emsd_main': - # import tractor - # await tractor.breakpoint() - - except ContextCancelled: + except ContextCancelled as err: + _err = err if not ctx._cancel_called: + # context was cancelled at the far end but was + # not part of this end requesting that cancel + # so raise for the local task to respond and handle. raise # if the context was cancelled by client code @@ -431,16 +427,43 @@ class Portal: else: log.debug(f'Context {ctx} cancelled gracefully') - except trio.Cancelled: - # the context cancels itself on any deviation - await ctx.cancel() + except ( + trio.Cancelled, + trio.MultiError, + Exception, + ) as err: + _err = err + # the context cancels itself on any cancel + # causing error. + log.error(f'Context {ctx} sending cancel to far end') + with trio.CancelScope(shield=True): + await ctx.cancel() raise - # finally: - # log.info(f'Context for {func.__name__} completed') + finally: + result = await ctx.result() - # finally: - # if recv_chan is not None: + # though it should be impossible for any tasks + # operating *in* this scope to have survived + # we tear down the runtime feeder chan last + # to avoid premature stream clobbers. + if recv_chan is not None: + await recv_chan.aclose() + + if _err: + if ctx._cancel_called: + log.warning( + f'Context {fn_name} cancelled by caller with\n{_err}' + ) + elif _err is not None: + log.warning( + f'Context {fn_name} cancelled by callee with\n{_err}' + ) + else: + log.info( + f'Context {fn_name} returned ' + f'value from callee `{self._result}`' + ) @dataclass @@ -465,10 +488,12 @@ class LocalPortal: @asynccontextmanager async def open_portal( + channel: Channel, nursery: Optional[trio.Nursery] = None, start_msg_loop: bool = True, shield: bool = False, + ) -> AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -508,6 +533,7 @@ async def open_portal( if was_connected: # gracefully signal remote channel-msg loop await channel.send(None) + # await channel.aclose() # cancel background msg loop task if msg_loop_cs: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 976180f..d69bd44 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -67,8 +67,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise trio.EndOfChannel try: - # if self._ctx.chan.uid[0] == 'brokerd.ib': - # breakpoint() msg = await self._rx_chan.receive() return msg['yield'] @@ -173,7 +171,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan - if rx_chan._closed: # or self._eoc: + if rx_chan._closed: # or self._eoc: log.warning(f"{self} is already closed") # this stream has already been closed so silently succeed as @@ -338,6 +336,12 @@ class Context: msg: Dict[str, Any], ) -> None: + '''Unpack and raise a msg error into the local scope + nursery for this context. + + Acts as a form of "relay" for a remote error raised + in the corresponding remote callee task. + ''' async def raiser(): raise unpack_error(msg, self.chan) @@ -350,11 +354,13 @@ class Context: Timeout quickly in an attempt to sidestep 2-generals... ''' - log.warning(f'Cancelling caller side of context {self}') + side = 'caller' if self._portal else 'callee' + + log.warning(f'Cancelling {side} side of context to {self.chan}') self._cancel_called = True - if self._portal: # caller side: + if side == 'caller': if not self._portal: raise RuntimeError( "No portal found, this is likely a callee side context" @@ -382,8 +388,8 @@ class Context: "May have failed to cancel remote task " f"{cid} for {self._portal.channel.uid}") else: - # ensure callee side - assert self._scope_nursery + # callee side remote task + # TODO: should we have an explicit cancel message # or is relaying the local `trio.Cancelled` as an # {'error': trio.Cancelled, cid: "blah"} enough? From c6cdaf9c31bdeb0bcde5a410b2cea0444596ff7a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Jun 2021 08:45:09 -0400 Subject: [PATCH 32/50] De-densify some code --- tractor/_discovery.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 9e520b3..bcfcc84 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -16,12 +16,14 @@ from ._state import current_actor, _runtime_vars @asynccontextmanager async def get_arbiter( + host: str, port: int, + ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: - """Return a portal instance connected to a local or remote + '''Return a portal instance connected to a local or remote arbiter. - """ + ''' actor = current_actor() if not actor: @@ -33,16 +35,20 @@ async def get_arbiter( yield LocalPortal(actor, Channel((host, port))) else: async with _connect_chan(host, port) as chan: + async with open_portal(chan) as arb_portal: + yield arb_portal @asynccontextmanager async def get_root( -**kwargs, + **kwargs, ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: + host, port = _runtime_vars['_root_mailbox'] assert host is not None + async with _connect_chan(host, port) as chan: async with open_portal(chan, **kwargs) as portal: yield portal @@ -60,12 +66,16 @@ async def find_actor( """ actor = current_actor() async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) + # TODO: return portals to all available actors - for now just # the last one that registered if name == 'arbiter' and actor.is_arbiter: raise RuntimeError("The current actor is the arbiter") + elif sockaddr: + async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal @@ -83,9 +93,12 @@ async def wait_for_actor( A portal to the first registered actor is returned. """ actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal From b21e2a6caaf8bc7e6e78497b103f5156fd53b592 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Jun 2021 13:47:38 -0400 Subject: [PATCH 33/50] Add pre-stream open error conditions --- tractor/_streaming.py | 40 ++++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index d69bd44..f23376a 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -15,7 +15,7 @@ import warnings import trio from ._ipc import Channel -from ._exceptions import unpack_error +from ._exceptions import unpack_error, ContextCancelled from ._state import current_actor from .log import get_logger @@ -135,16 +135,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise # propagate - # except trio.Cancelled: - # if not self._shielded: - # # if shielded we don't propagate a cancelled - # raise - - # except trio.Cancelled: - # # relay cancels to the remote task - # await self.aclose() - # raise - @contextmanager def shield( self @@ -171,7 +161,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan - if rx_chan._closed: # or self._eoc: + if rx_chan._closed: log.warning(f"{self} is already closed") # this stream has already been closed so silently succeed as @@ -440,19 +430,25 @@ class Context: self.cid ) - # XXX: If the underlying receive mem chan has been closed then - # likely client code has already exited a ``.open_stream()`` - # block prior. we error here until such a time that we decide - # allowing streams to be "re-connected" is supported and/or - # a good idea. - if recv_chan._closed: + # Likewise if the surrounding context has been cancelled we error here + # since it likely means the surrounding block was exited or + # killed + + if self._cancel_called: task = trio.lowlevel.current_task().name - raise trio.ClosedResourceError( - f'stream for {actor.uid[0]}:{task} has already been closed.' - '\nRe-opening a closed stream is not yet supported!' - '\nConsider re-calling the containing `@tractor.context` func' + raise ContextCancelled( + f'Context around {actor.uid[0]}:{task} was already cancelled!' ) + # XXX: If the underlying channel feeder receive mem chan has + # been closed then likely client code has already exited + # a ``.open_stream()`` block prior or there was some other + # unanticipated error or cancellation from ``trio``. + + if recv_chan._closed: + raise trio.ClosedResourceError( + 'The underlying channel for this stream was already closed!?') + async with MsgStream( ctx=self, rx_chan=recv_chan, From 0623de0b47d60b2ff6645fa1754f72ff4208cbac Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Jun 2021 13:52:04 -0400 Subject: [PATCH 34/50] Expect context cancelled when we cancel --- tests/test_2way.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_2way.py b/tests/test_2way.py index 716e2ce..1ef05d2 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -262,7 +262,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream( async with ctx.open_stream() as stream: async for msg in stream: pass - except trio.ClosedResourceError: + except tractor.ContextCancelled: pass else: assert 0, "Should have received closed resource error?" From a134bc490fb24ee8ecf0b1ce54a14d102b7d91bd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Jun 2021 13:52:31 -0400 Subject: [PATCH 35/50] Avoid mutate during interate error --- tests/test_advanced_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index a4d36a9..4429d25 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -30,7 +30,7 @@ async def publisher( sub = 'even' if is_even(val) else 'odd' - for sub_stream in _registry[sub]: + for sub_stream in _registry[sub].copy(): await sub_stream.send(val) # throttle send rate to ~1kHz From ef725c597267fbfe3e6c4b1576e5741e798075a3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jun 2021 20:52:08 -0400 Subject: [PATCH 36/50] Always hard kill sub-procs on teardown Adds a new hard kill routine for the `trio` spawning backend. --- tractor/_spawn.py | 52 +++++++++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index d893479..ca66de1 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -22,7 +22,10 @@ from multiprocessing import forkserver # type: ignore from typing import Tuple from . import _forkserver_override -from ._state import current_actor, is_main_process +from ._state import ( + current_actor, + is_main_process, +) from .log import get_logger from ._portal import Portal from ._actor import Actor, ActorFailure @@ -149,6 +152,27 @@ async def cancel_on_completion( await portal.cancel_actor() +async def do_hard_kill( + proc: trio.Process, +) -> None: + # NOTE: this timeout used to do nothing since we were shielding + # the ``.wait()`` inside ``new_proc()`` which will pretty much + # never release until the process exits, now it acts as + # a hard-kill time ultimatum. + with trio.move_on_after(3) as cs: + + # NOTE: This ``__aexit__()`` shields internally. + async with proc: # calls ``trio.Process.aclose()`` + log.debug(f"Terminating {proc}") + + if cs.cancelled_caught: + # XXX: should pretty much never get here unless we have + # to move the bits from ``proc.__aexit__()`` out and + # into here. + log.critical(f"HARD KILLING {proc}") + proc.kill() + + @asynccontextmanager async def spawn_subactor( subactor: 'Actor', @@ -180,26 +204,15 @@ async def spawn_subactor( proc = await trio.open_process(spawn_cmd) try: yield proc + finally: + # XXX: do this **after** cancellation/tearfown # to avoid killing the process too early # since trio does this internally on ``__aexit__()`` - # NOTE: we always "shield" join sub procs in - # the outer scope since no actor zombies are - # ever allowed. This ``__aexit__()`` also shields - # internally. log.debug(f"Attempting to kill {proc}") - - # NOTE: this timeout effectively does nothing right now since - # we are shielding the ``.wait()`` inside ``new_proc()`` which - # will pretty much never release until the process exits. - with trio.move_on_after(3) as cs: - async with proc: - log.debug(f"Terminating {proc}") - if cs.cancelled_caught: - log.critical(f"HARD KILLING {proc}") - proc.kill() + await do_hard_kill(proc) async def new_proc( @@ -277,9 +290,14 @@ async def new_proc( # reaping more stringently without the shield # we used to have below... - # always "hard" join sub procs: - # no actor zombies allowed # with trio.CancelScope(shield=True): + # async with proc: + + # Always "hard" join sub procs since no actor zombies + # are allowed! + + # this is a "light" (cancellable) join, the hard join is + # in the enclosing scope (see above). await proc.wait() log.debug(f"Joined {proc}") From b1cd7fdedf2e1f4bb09a33030f3645ad9c3a8b64 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 30 May 2021 11:32:33 -0400 Subject: [PATCH 37/50] Don't shield on root cancel it can causes hangs --- tractor/_root.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index f5bd778..da4ba68 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -179,8 +179,7 @@ async def open_root_actor( finally: logger.info("Shutting down root actor") - with trio.CancelScope(shield=True): - await actor.cancel() + await actor.cancel() finally: _state._current_actor = None logger.info("Root actor terminated") From 98bbf8e0df1d048a13ae1d85b5506e9bc074a834 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Jul 2021 13:37:09 -0400 Subject: [PATCH 38/50] Move join event trigger to direct exit path --- tractor/_trionics.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 894ab7d..eea3aae 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -252,6 +252,12 @@ async def _open_and_supervise_one_cancels_all_nursery( f"Waiting on subactors {anursery._children} " "to complete" ) + + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope + log.debug("Waiting on all subactors to complete") + anursery._join_procs.set() + except BaseException as err: # if the caller's scope errored then we activate our # one-cancels-all supervisor strategy (don't @@ -292,11 +298,6 @@ async def _open_and_supervise_one_cancels_all_nursery( else: raise - # Last bit before first nursery block ends in the case - # where we didn't error in the caller's scope - log.debug("Waiting on all subactors to complete") - anursery._join_procs.set() - # ria_nursery scope end # XXX: do we need a `trio.Cancelled` catch here as well? From 12f987514d19de964b986e2375b7cb57f175ed2f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Jul 2021 14:03:14 -0400 Subject: [PATCH 39/50] Don't enter debug on closed resource errors --- tractor/_actor.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index b514d8c..53ac484 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -197,9 +197,11 @@ async def _invoke( # TODO: maybe we'll want differnet "levels" of debugging # eventualy such as ('app', 'supervisory', 'runtime') ? - if not isinstance(err, trio.ClosedResourceError) and ( - not is_multi_cancelled(err)) and ( - not isinstance(err, ContextCancelled) + if ( + not is_multi_cancelled(err) and ( + not isinstance(err, ContextCancelled) or + (isinstance(err, ContextCancelled) and ctx._cancel_called) + ) ): # XXX: is there any case where we'll want to debug IPC # disconnects? I can't think of a reason that inspecting @@ -443,7 +445,7 @@ class Actor: chans = self._peers[uid] - # TODO: re-use channels for new connections instead + # TODO: re-use channels for new connections instead # of always new ones; will require changing all the # discovery funcs if chans: @@ -519,10 +521,10 @@ class Actor: send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] assert send_chan.cid == cid # type: ignore - if 'error' in msg: - ctx = getattr(recv_chan, '_ctx', None) - # if ctx: - # ctx._error_from_remote_msg(msg) + # if 'error' in msg: + # ctx = getattr(recv_chan, '_ctx', None) + # if ctx: + # ctx._error_from_remote_msg(msg) # log.debug(f"{send_chan} was terminated at remote end") # # indicate to consumer that far end has stopped From 7f86d63e771a2042885480cf14c7bc8390ef23b5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Jul 2021 14:16:23 -0400 Subject: [PATCH 40/50] Drop trip kwarg --- tractor/_spawn.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index ca66de1..678250b 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -225,7 +225,6 @@ async def new_proc( parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child *, - use_trio_run_in_process: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: """Create a new ``multiprocessing.Process`` using the @@ -236,7 +235,7 @@ async def new_proc( # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method - if use_trio_run_in_process or _spawn_method == 'trio': + if _spawn_method == 'trio': async with trio.open_nursery() as nursery: async with spawn_subactor( subactor, @@ -338,7 +337,6 @@ async def mp_new_proc( parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child *, - use_trio_run_in_process: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: From 9ddb6547838f9bdce23bfa13d0d68a061646d992 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Jun 2021 13:59:08 -0400 Subject: [PATCH 41/50] Avoid mutate on iterate race --- tractor/_actor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 53ac484..15dacf2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -606,11 +606,14 @@ class Actor: task_status.started(loop_cs) async for msg in chan: if msg is None: # loop terminate sentinel + log.debug( f"Cancelling all tasks for {chan} from {chan.uid}") - for (channel, cid) in self._rpc_tasks: + + for (channel, cid) in self._rpc_tasks.copy(): if channel is chan: await self._cancel_task(cid, channel) + log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") From 2513c652c1d0366ca02ab445620b2e24d250fffc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jun 2021 15:55:03 -0400 Subject: [PATCH 42/50] Go back to only logging crashes if no pdb gets engaged --- tractor/_actor.py | 41 ++++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 15dacf2..fd7d231 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -195,21 +195,32 @@ async def _invoke( except (Exception, trio.MultiError) as err: - # TODO: maybe we'll want differnet "levels" of debugging - # eventualy such as ('app', 'supervisory', 'runtime') ? - if ( - not is_multi_cancelled(err) and ( - not isinstance(err, ContextCancelled) or - (isinstance(err, ContextCancelled) and ctx._cancel_called) - ) - ): - # XXX: is there any case where we'll want to debug IPC - # disconnects? I can't think of a reason that inspecting - # this type of failure will be useful for respawns or - # recovery logic - the only case is some kind of strange bug - # in `trio` itself? - entered = await _debug._maybe_enter_pm(err) - if not entered: + if not is_multi_cancelled(err): + + log.exception("Actor crashed:") + + # TODO: maybe we'll want different "levels" of debugging + # eventualy such as ('app', 'supervisory', 'runtime') ? + + # if not isinstance(err, trio.ClosedResourceError) and ( + # if not is_multi_cancelled(err) and ( + + entered_debug: bool = False + if not isinstance(err, ContextCancelled) or ( + isinstance(err, ContextCancelled) and ctx._cancel_called + ): + # XXX: is there any case where we'll want to debug IPC + # disconnects as a default? + # + # I can't think of a reason that inspecting + # this type of failure will be useful for respawns or + # recovery logic - the only case is some kind of strange bug + # in our transport layer itself? Going to keep this + # open ended for now. + + entered_debug = await _debug._maybe_enter_pm(err) + + if not entered_debug: log.exception("Actor crashed:") # always ship errors back to caller From 929b6dcc83c40932ee02f3f622522d6047355997 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 6 Jul 2021 07:56:38 -0400 Subject: [PATCH 43/50] Skip debugger tests on windows at module level --- tests/test_debugger.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 910e37a..8b9220b 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -8,6 +8,7 @@ TODO: None of these tests have been run successfully on windows yet. """ import time from os import path +import platform import pytest import pexpect @@ -25,6 +26,13 @@ from conftest import repodir # - recurrent root errors +if platform.system() == 'Windows': + pytest.skip( + 'Debugger tests have no windows support (yet)', + allow_module_level=True, + ) + + def examples_dir(): """Return the abspath to the examples directory. """ From 31590e82a39c9c16511bfb380cc84bdccc40790c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Jun 2021 09:07:21 -0400 Subject: [PATCH 44/50] Flip "trace" level to "transport" level logging --- tractor/log.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tractor/log.py b/tractor/log.py index e8327a3..ff48d6b 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -29,19 +29,20 @@ LOG_FORMAT = ( DATE_FORMAT = '%b %d %H:%M:%S' LEVELS = { 'GARBAGE': 1, - 'TRACE': 5, - 'PROFILE': 15, - 'RUNTIME': 500, + 'TRANSPORT': 5, + 'RUNTIME': 15, + 'PDB': 500, 'QUIET': 1000, } STD_PALETTE = { 'CRITICAL': 'red', 'ERROR': 'red', - 'RUNTIME': 'white', + 'PDB': 'white', 'WARNING': 'yellow', 'INFO': 'green', + 'RUNTIME': 'white', 'DEBUG': 'white', - 'TRACE': 'cyan', + 'TRANSPORT': 'cyan', 'GARBAGE': 'blue', } BOLD_PALETTE = { @@ -76,7 +77,7 @@ def get_logger( # additional levels for name, val in LEVELS.items(): logging.addLevelName(val, name) - # ex. create ``logger.trace()`` + # ex. create ``logger.runtime()`` setattr(logger, name.lower(), partial(logger.log, val)) return logger From 8c927d708dfc0c5e1d4617f1e4b5814056b3e22a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Jun 2021 13:47:07 -0400 Subject: [PATCH 45/50] Change trace to transport level --- tractor/_actor.py | 5 +++-- tractor/_debug.py | 2 +- tractor/_ipc.py | 26 ++++++++++++++++++-------- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index fd7d231..4359e13 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -463,7 +463,8 @@ class Actor: log.runtime( f"already have channel(s) for {uid}:{chans}?" ) - log.trace(f"Registered {chan} for {uid}") # type: ignore + + log.runtime(f"Registered {chan} for {uid}") # type: ignore # append new channel self._peers[uid].append(chan) @@ -631,7 +632,7 @@ class Actor: break - log.trace( # type: ignore + log.transport( # type: ignore f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') diff --git a/tractor/_debug.py b/tractor/_debug.py index 75e502a..c1b1832 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -102,7 +102,7 @@ class PdbwTeardown(pdbpp.Pdb): # async with aclosing(async_stdin): # async for msg in async_stdin: -# log.trace(f"Stdin input:\n{msg}") +# log.runtime(f"Stdin input:\n{msg}") # # encode to bytes # bmsg = str.encode(msg) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index efe388e..08057e9 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,5 +1,6 @@ """ Inter-process comms abstractions + """ import platform import typing @@ -61,7 +62,6 @@ class MsgpackTCPStream: use_list=False, ) while True: - try: data = await self.stream.receive_some(2**10) @@ -88,7 +88,7 @@ class MsgpackTCPStream: else: raise - log.trace(f"received {data}") # type: ignore + log.transport(f"received {data}") # type: ignore if data == b'': raise TransportClosed( @@ -169,6 +169,7 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( + self, destaddr: Tuple[Any, ...] = None, **kwargs @@ -180,13 +181,21 @@ class Channel: destaddr = destaddr or self._destaddr assert isinstance(destaddr, tuple) - stream = await trio.open_tcp_stream(*destaddr, **kwargs) + + stream = await trio.open_tcp_stream( + *destaddr, + **kwargs + ) self.msgstream = MsgpackTCPStream(stream) + + log.transport( + f'Opened channel to peer {self.laddr} -> {self.raddr}' + ) return stream async def send(self, item: Any) -> None: - log.trace(f"send `{item}`") # type: ignore + log.transport(f"send `{item}`") # type: ignore assert self.msgstream await self.msgstream.send(item) @@ -205,7 +214,8 @@ class Channel: raise async def aclose(self) -> None: - log.debug( + + log.transport( f'Closing channel to {self.uid} ' f'{self.laddr} -> {self.raddr}' ) @@ -234,11 +244,11 @@ class Channel: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warning( + log.transport( "Reconnect timed out after 3 seconds, retrying...") continue else: - log.warning("Stream connection re-established!") + log.transport("Stream connection re-established!") # run any reconnection sequence on_recon = self._recon_seq if on_recon: @@ -247,7 +257,7 @@ class Channel: except (OSError, ConnectionRefusedError): if not down: down = True - log.warning( + log.transport( f"Connection to {self.raddr} went down, waiting" " for re-establishment") await trio.sleep(1) From fde52d246438aa3ab6cafea943689532e548d4fd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Jul 2021 12:48:34 -0400 Subject: [PATCH 46/50] Mypy fixes --- tractor/_actor.py | 2 +- tractor/_portal.py | 16 ++++++---------- tractor/_streaming.py | 3 +++ 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 4359e13..c45449d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -64,7 +64,7 @@ async def _invoke( tb = None cancel_scope = trio.CancelScope() - cs: trio.CancelScope = None + cs: Optional[trio.CancelScope] = None ctx = Context(chan, cid) context: bool = False diff --git a/tractor/_portal.py b/tractor/_portal.py index 4471c8f..44e8630 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -391,9 +391,8 @@ class Portal: else: raise - _err = None - # deliver context instance and .started() msg value in open - # tuple. + _err: Optional[BaseException] = None + # deliver context instance and .started() msg value in open tuple. try: async with trio.open_nursery() as scope_nursery: ctx = Context( @@ -403,16 +402,13 @@ class Portal: _recv_chan=recv_chan, _scope_nursery=scope_nursery, ) - recv_chan._ctx = ctx + + # pairs with handling in ``Actor._push_result()`` + # recv_chan._ctx = ctx # await trio.lowlevel.checkpoint() yield ctx, first - # if not ctx._cancel_called: - # await ctx.result() - - # await recv_chan.aclose() - except ContextCancelled as err: _err = err if not ctx._cancel_called: @@ -462,7 +458,7 @@ class Portal: else: log.info( f'Context {fn_name} returned ' - f'value from callee `{self._result}`' + f'value from callee `{result}`' ) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index f23376a..eead6f6 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -332,6 +332,8 @@ class Context: Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. ''' + assert self._scope_nursery + async def raiser(): raise unpack_error(msg, self.chan) @@ -385,6 +387,7 @@ class Context: # {'error': trio.Cancelled, cid: "blah"} enough? # This probably gets into the discussion in # https://github.com/goodboy/tractor/issues/36 + assert self._scope_nursery self._scope_nursery.cancel_scope.cancel() if self._recv_chan: From 25779d48a8b2bfd4975302f05d5cb39b9215641d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Jul 2021 12:51:35 -0400 Subject: [PATCH 47/50] Define explicit adapter level methods for mypy --- tractor/log.py | 43 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/tractor/log.py b/tractor/log.py index ff48d6b..667c7c6 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -26,14 +26,15 @@ LOG_FORMAT = ( " {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}" " {reset}{bold_white}{thin_white}{message}" ) + DATE_FORMAT = '%b %d %H:%M:%S' + LEVELS = { - 'GARBAGE': 1, 'TRANSPORT': 5, 'RUNTIME': 15, 'PDB': 500, - 'QUIET': 1000, } + STD_PALETTE = { 'CRITICAL': 'red', 'ERROR': 'red', @@ -43,19 +44,44 @@ STD_PALETTE = { 'RUNTIME': 'white', 'DEBUG': 'white', 'TRANSPORT': 'cyan', - 'GARBAGE': 'blue', } + BOLD_PALETTE = { 'bold': { level: f"bold_{color}" for level, color in STD_PALETTE.items()} } +class StackLevelAdapter(logging.LoggerAdapter): + + def transport( + self, + msg: str, + + ) -> None: + return self.log(5, msg) + + def runtime( + self, + msg: str, + ) -> None: + return self.log(15, msg) + + def pdb( + self, + msg: str, + ) -> None: + return self.log(500, msg) + + def get_logger( + name: str = None, _root_name: str = _proj_name, -) -> logging.LoggerAdapter: - '''Return the package log or a sub-log for `name` if provided. + +) -> StackLevelAdapter: + '''Return the package log or a sub-logger for ``name`` if provided. + ''' log = rlog = logging.getLogger(_root_name) @@ -72,13 +98,14 @@ def get_logger( # add our actor-task aware adapter which will dynamically look up # the actor and task names at each log emit - logger = logging.LoggerAdapter(log, ActorContextInfo()) + logger = StackLevelAdapter(log, ActorContextInfo()) # additional levels for name, val in LEVELS.items(): logging.addLevelName(val, name) - # ex. create ``logger.runtime()`` - setattr(logger, name.lower(), partial(logger.log, val)) + + # ensure customs levels exist as methods + assert getattr(logger, name.lower()), f'Logger does not define {name}' return logger From 443ebea165fd7af1f22174b214a3049806a181a1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Jul 2021 13:02:33 -0400 Subject: [PATCH 48/50] Use "pdb" level logging in debug mode --- tractor/_debug.py | 4 ++-- tractor/_root.py | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index c1b1832..c76422a 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -276,7 +276,7 @@ def _set_trace(actor=None): pdb = _mk_pdb() if actor is not None: - log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") + log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") pdb.set_trace( # start 2 levels up in user code @@ -306,7 +306,7 @@ breakpoint = partial( def _post_mortem(actor): - log.runtime(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") + log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") pdb = _mk_pdb() # custom Pdb post-mortem entry diff --git a/tractor/_root.py b/tractor/_root.py index da4ba68..8391f4c 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -86,6 +86,9 @@ async def open_root_actor( # for use of ``await tractor.breakpoint()`` enable_modules.append('tractor._debug') + if loglevel is None: + loglevel = 'pdb' + elif debug_mode: raise RuntimeError( "Debug mode is only supported for the `trio` backend!" From 69bbf6a957dfd74751e9532591837d22a2fba250 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Jul 2021 13:53:28 -0400 Subject: [PATCH 49/50] Install test deps and py3.9 for type check job --- .github/workflows/ci.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d2f6237..e6c19ac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,15 +6,19 @@ jobs: mypy: name: 'MyPy' runs-on: ubuntu-latest + steps: - name: Checkout uses: actions/checkout@v2 + - name: Setup python uses: actions/setup-python@v2 with: - python-version: '3.8' + python-version: '3.9' + - name: Install dependencies - run: pip install -U . --upgrade-strategy eager + run: pip install -U . --upgrade-strategy eager -r requirements-test.txt + - name: Run MyPy check run: mypy tractor/ --ignore-missing-imports From 240f5912340bc9bfacef46cb252d6af8619e937e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 31 Jul 2021 12:10:25 -0400 Subject: [PATCH 50/50] Add 2-way streaming example to readme and scripts --- docs/README.rst | 95 ++++++++++++++++++++++++++++++++- examples/rpc_bidir_streaming.py | 72 +++++++++++++++++++++++++ tests/test_docs_examples.py | 8 ++- 3 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 examples/rpc_bidir_streaming.py diff --git a/docs/README.rst b/docs/README.rst index bd7b6af..18afd26 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -127,7 +127,8 @@ Zombie safe: self-destruct a process tree print('This process tree will self-destruct in 1 sec...') await trio.sleep(1) - # you could have done this yourself + # raise an error in root actor/process and trigger + # reaping of all minions raise Exception('Self Destructed') @@ -197,6 +198,98 @@ And, yes, there's a built-in crash handling mode B) We're hoping to add a respawn-from-repl system soon! +SC compatible bi-directional streaming +-------------------------------------- +Yes, you saw it here first; we provide 2-way streams +with reliable, transitive setup/teardown semantics. + +Our nascent api is remniscent of ``trio.Nursery.start()`` +style invocation: + +.. code:: python + + import trio + import tractor + + + @tractor.context + async def simple_rpc( + + ctx: tractor.Context, + data: int, + + ) -> None: + '''Test a small ping-pong 2-way streaming server. + + ''' + # signal to parent that we're up much like + # ``trio_typing.TaskStatus.started()`` + await ctx.started(data + 1) + + async with ctx.open_stream() as stream: + + count = 0 + async for msg in stream: + + assert msg == 'ping' + await stream.send('pong') + count += 1 + + else: + assert count == 10 + + + async def main() -> None: + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'rpc_server', + enable_modules=[__name__], + ) + + # XXX: this syntax requires py3.9 + async with ( + + portal.open_context( + simple_rpc, + data=10, + ) as (ctx, sent), + + ctx.open_stream() as stream, + ): + + assert sent == 11 + + count = 0 + # receive msgs using async for style + await stream.send('ping') + + async for msg in stream: + assert msg == 'pong' + await stream.send('ping') + count += 1 + + if count >= 9: + break + + + # explicitly teardown the daemon-actor + await portal.cancel_actor() + + + if __name__ == '__main__': + trio.run(main) + + +See original proposal and discussion in `#53`_ as well +as follow up improvements in `#223`_ that we'd love to +hear your thoughts on! + +.. _#53: https://github.com/goodboy/tractor/issues/53 +.. _#223: https://github.com/goodboy/tractor/issues/223 + + Worker poolz are easy peasy --------------------------- The initial ask from most new users is *"how do I make a worker diff --git a/examples/rpc_bidir_streaming.py b/examples/rpc_bidir_streaming.py new file mode 100644 index 0000000..7320081 --- /dev/null +++ b/examples/rpc_bidir_streaming.py @@ -0,0 +1,72 @@ +import trio +import tractor + + +@tractor.context +async def simple_rpc( + + ctx: tractor.Context, + data: int, + +) -> None: + '''Test a small ping-pong 2-way streaming server. + + ''' + # signal to parent that we're up much like + # ``trio_typing.TaskStatus.started()`` + await ctx.started(data + 1) + + async with ctx.open_stream() as stream: + + count = 0 + async for msg in stream: + + assert msg == 'ping' + await stream.send('pong') + count += 1 + + else: + assert count == 10 + + +async def main() -> None: + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'rpc_server', + enable_modules=[__name__], + ) + + # XXX: syntax requires py3.9 + async with ( + + portal.open_context( + simple_rpc, # taken from pytest parameterization + data=10, + + ) as (ctx, sent), + + ctx.open_stream() as stream, + ): + + assert sent == 11 + + count = 0 + # receive msgs using async for style + await stream.send('ping') + + async for msg in stream: + assert msg == 'pong' + await stream.send('ping') + count += 1 + + if count >= 9: + break + + # explicitly teardown the daemon-actor + await portal.cancel_actor() + + +if __name__ == '__main__': + trio.run(main) diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 632d85c..5f47419 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -84,8 +84,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): if '__' not in f and f[0] != '_' - and 'debugging' not in p[0] - ], + and 'debugging' not in p[0]], + ids=lambda t: t[1], ) def test_example(run_example_in_subproc, example_script): @@ -98,6 +98,10 @@ def test_example(run_example_in_subproc, example_script): test_example``. """ ex_file = os.path.join(*example_script) + + if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9): + pytest.skip("2-way streaming example requires py3.9 async with syntax") + with open(ex_file, 'r') as ex: code = ex.read()