From 7f38b7225dd0ca9a9e2264ecebf5ca49fad30de0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 10:03:00 -0400 Subject: [PATCH 01/14] Aggregate and organize streaming components Move receive stream into streaming modules and rebrand as a "message stream". Factor out cancellation mechanics in `.aclose()` into the `Context` type which will soon provide the api for for cancelling portal invocations. Comment-stage a few methods on both types in anticipation of a new bi-directional streaming api. Add a `MsgStream` bidirectional channel type which will be the eventual type yielded from `Context.open_stream()`. Adjust the response/dialog types to be the set `{'asyncfun', 'asyncgen', 'context'}`. OH, and add async func checking in `Portal.run()` to catch and error on sync funcs early. --- tractor/_portal.py | 243 ++++++++++++++++-------------------------- tractor/_streaming.py | 213 ++++++++++++++++++++++++++++++++++-- 2 files changed, 295 insertions(+), 161 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 6d1c802..e1b37fe 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -4,10 +4,9 @@ Portal api import importlib import inspect import typing -from typing import Tuple, Any, Dict, Optional, Set, Iterator +from typing import Tuple, Any, Dict, Optional, Set from functools import partial from dataclasses import dataclass -from contextlib import contextmanager import warnings import trio @@ -17,9 +16,10 @@ from ._state import current_actor from ._ipc import Channel from .log import get_logger from ._exceptions import unpack_error, NoResult, RemoteActorError +from ._streaming import Context, ReceiveMsgStream -log = get_logger('tractor') +log = get_logger(__name__) @asynccontextmanager @@ -39,113 +39,23 @@ async def maybe_open_nursery( yield nursery -class ReceiveStream(trio.abc.ReceiveChannel): - """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with - special behaviour for signalling stream termination across an - inter-actor ``Channel``. This is the type returned to a local task - which invoked a remote streaming function using `Portal.run()`. - - Termination rules: - - if the local task signals stop iteration a cancel signal is - relayed to the remote task indicating to stop streaming - - if the remote task signals the end of a stream, raise a - ``StopAsyncIteration`` to terminate the local ``async for`` - - """ - def __init__( - self, - cid: str, - rx_chan: trio.abc.ReceiveChannel, - portal: 'Portal', - ) -> None: - self._cid = cid - self._rx_chan = rx_chan - self._portal = portal - self._shielded = False - - # delegate directly to underlying mem channel - def receive_nowait(self): - return self._rx_chan.receive_nowait() - - async def receive(self): - try: - msg = await self._rx_chan.receive() - return msg['yield'] - except trio.ClosedResourceError: - # when the send is closed we assume the stream has - # terminated and signal this local iterator to stop - await self.aclose() - raise StopAsyncIteration - except trio.Cancelled: - # relay cancels to the remote task - await self.aclose() - raise - except KeyError: - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?") - raise unpack_error(msg, self._portal.channel) - - @contextmanager - def shield( - self - ) -> Iterator['ReceiveStream']: # noqa - """Shield this stream's underlying channel such that a local consumer task - can be cancelled (and possibly restarted) using ``trio.Cancelled``. - - """ - self._shielded = True - yield self - self._shielded = False - - async def aclose(self): - """Cancel associated remote actor task and local memory channel - on close. - """ - if self._rx_chan._closed: - log.warning(f"{self} is already closed") - return - - if self._shielded: - log.warning(f"{self} is shielded, portal channel being kept alive") - return - - cid = self._cid - with trio.move_on_after(0.5) as cs: - cs.shield = True - log.warning( - f"Cancelling stream {cid} to " - f"{self._portal.channel.uid}") - - # NOTE: we're telling the far end actor to cancel a task - # corresponding to *this actor*. The far end local channel - # instance is passed to `Actor._cancel_task()` implicitly. - await self._portal.run_from_ns('self', '_cancel_task', cid=cid) - - if cs.cancelled_caught: - # XXX: there's no way to know if the remote task was indeed - # cancelled in the case where the connection is broken or - # some other network error occurred. - if not self._portal.channel.connected(): - log.warning( - "May have failed to cancel remote task " - f"{cid} for {self._portal.channel.uid}") - - with trio.CancelScope(shield=True): - await self._rx_chan.aclose() - - def clone(self): - return self - - class Portal: """A 'portal' to a(n) (remote) ``Actor``. - Allows for invoking remote routines and receiving results through an - underlying ``tractor.Channel`` as though the remote (async) - function / generator was invoked locally. + A portal is "opened" (and eventually closed) by one side of an + inter-actor communication context. The side which opens the portal + is equivalent to a "caller" in function parlance and usually is + either the called actor's parent (in process tree hierarchy terms) + or a client interested in scheduling work to be done remotely in a + far process. + + The portal api allows the "caller" actor to invoke remote routines + and receive results through an underlying ``tractor.Channel`` as + though the remote (async) function / generator was called locally. + It may be thought of loosely as an RPC api where native Python + function calling semantics are supported transparently; hence it is + like having a "portal" between the seperate actor memory spaces. - Think of this like a native async IPC API. """ def __init__(self, channel: Channel) -> None: self.channel = channel @@ -157,7 +67,7 @@ class Portal: self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None - self._streams: Set[ReceiveStream] = set() + self._streams: Set[ReceiveMsgStream] = set() self.actor = current_actor() async def _submit( @@ -182,55 +92,19 @@ class Portal: first_msg = await recv_chan.receive() functype = first_msg.get('functype') - if functype == 'asyncfunc': - resp_type = 'return' - elif functype == 'asyncgen': - resp_type = 'yield' - elif 'error' in first_msg: + if 'error' in first_msg: raise unpack_error(first_msg, self.channel) - else: + + elif functype not in ('asyncfunc', 'asyncgen', 'context'): raise ValueError(f"{first_msg} is an invalid response packet?") - return cid, recv_chan, resp_type, first_msg + return cid, recv_chan, functype, first_msg async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" self._expect_result = await self._submit(ns, func, kwargs) - async def run( - self, - func_or_ns: str, - fn_name: Optional[str] = None, - **kwargs - ) -> Any: - """Submit a remote function to be scheduled and run by actor, in - a new task, wrap and return its (stream of) result(s). - - This is a blocking call and returns either a value from the - remote rpc task or a local async generator instance. - """ - if isinstance(func_or_ns, str): - warnings.warn( - "`Portal.run(namespace: str, funcname: str)` is now" - "deprecated, pass a function reference directly instead\n" - "If you still want to run a remote function by name use" - "`Portal.run_from_ns()`", - DeprecationWarning, - stacklevel=2, - ) - fn_mod_path = func_or_ns - assert isinstance(fn_name, str) - - else: # function reference was passed directly - fn = func_or_ns - fn_mod_path = fn.__module__ - fn_name = fn.__name__ - - return await self._return_from_resptype( - *(await self._submit(fn_mod_path, fn_name, kwargs)) - ) - async def run_from_ns( self, namespace_path: str, @@ -260,15 +134,19 @@ class Portal: resptype: str, first_msg: dict ) -> Any: - # TODO: not this needs some serious work and thinking about how - # to make async-generators the fundamental IPC API over channels! - # (think `yield from`, `gen.send()`, and functional reactive stuff) - if resptype == 'yield': # stream response - rchan = ReceiveStream(cid, recv_chan, self) + + # receive only stream + if resptype == 'asyncgen': + ctx = Context(self.channel, cid, _portal=self) + rchan = ReceiveMsgStream(ctx, recv_chan, self) self._streams.add(rchan) return rchan - elif resptype == 'return': # single response + elif resptype == 'context': # context manager style setup/teardown + # TODO likely not here though + raise NotImplementedError + + elif resptype == 'asyncfunc': # single response msg = await recv_chan.receive() try: return msg['return'] @@ -369,6 +247,65 @@ class Portal: f"{self.channel} for {self.channel.uid} was already closed?") return False + async def run( + self, + func: str, + fn_name: Optional[str] = None, + **kwargs + ) -> Any: + """Submit a remote function to be scheduled and run by actor, in + a new task, wrap and return its (stream of) result(s). + + This is a blocking call and returns either a value from the + remote rpc task or a local async generator instance. + """ + if isinstance(func, str): + warnings.warn( + "`Portal.run(namespace: str, funcname: str)` is now" + "deprecated, pass a function reference directly instead\n" + "If you still want to run a remote function by name use" + "`Portal.run_from_ns()`", + DeprecationWarning, + stacklevel=2, + ) + fn_mod_path = func + assert isinstance(fn_name, str) + + else: # function reference was passed directly + + # TODO: ensure async + if not ( + inspect.isasyncgenfunction(func) or + inspect.iscoroutinefunction(func) + ): + raise TypeError(f'{func} must be an async function!') + + fn = func + fn_mod_path = fn.__module__ + fn_name = fn.__name__ + + return await self._return_from_resptype( + *(await self._submit(fn_mod_path, fn_name, kwargs)) + ) + + # @asynccontextmanager + # async def open_stream_from( + # self, + # async_gen: 'AsyncGeneratorFunction', + # **kwargs, + # ) -> ReceiveMsgStream: + # # TODO + # pass + + # @asynccontextmanager + # async def open_context( + # self, + # func: Callable, + # **kwargs, + # ) -> Context: + # # TODO + # pass + @dataclass class LocalPortal: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index e683216..512c432 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -1,14 +1,17 @@ import inspect -from contextvars import ContextVar +from contextlib import contextmanager # , asynccontextmanager from dataclasses import dataclass -from typing import Any +from typing import Any, Iterator, Optional +import warnings import trio from ._ipc import Channel +from ._exceptions import unpack_error +from .log import get_logger -_context: ContextVar['Context'] = ContextVar('context') +log = get_logger(__name__) @dataclass(frozen=True) @@ -18,22 +21,73 @@ class Context: Allows maintaining task or protocol specific state between communicating actors. A unique context is created on the receiving end for every request to a remote actor. + + A context can be cancelled and (eventually) restarted from + either side of the underlying IPC channel. + + A context can be used to open task oriented message streams. + """ chan: Channel cid: str - cancel_scope: trio.CancelScope + + # only set on the caller side + _portal: Optional['Portal'] = None # type: ignore # noqa + + # only set on the callee side + _cancel_scope: Optional[trio.CancelScope] = None async def send_yield(self, data: Any) -> None: + + warnings.warn( + "`Context.send_yield()` is now deprecated. " + "Use ``MessageStream.send()``. ", + DeprecationWarning, + stacklevel=2, + ) await self.chan.send({'yield': data, 'cid': self.cid}) async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) + async def cancel(self) -> None: + """Cancel this inter-actor-task context. -def current_context(): - """Get the current task's context instance. - """ - return _context.get() + Request that the far side cancel it's current linked context, + timeout quickly to sidestep 2-generals... + + """ + cid = self.cid + with trio.move_on_after(0.5) as cs: + cs.shield = True + log.warning( + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") + + # NOTE: we're telling the far end actor to cancel a task + # corresponding to *this actor*. The far end local channel + # instance is passed to `Actor._cancel_task()` implicitly. + await self._portal.run_from_ns('self', '_cancel_task', cid=cid) + + if cs.cancelled_caught: + # XXX: there's no way to know if the remote task was indeed + # cancelled in the case where the connection is broken or + # some other network error occurred. + if not self._portal.channel.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self._portal.channel.uid}") + + # async def restart(self) -> None: + # # TODO + # pass + + # @asynccontextmanager + # async def open_stream( + # self, + # ) -> AsyncContextManager: + # # TODO + # pass def stream(func): @@ -47,3 +101,146 @@ def stream(func): f"{func.__name__} must be `ctx: tractor.Context`" ) return func + + +class ReceiveMsgStream(trio.abc.ReceiveChannel): + """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with + special behaviour for signalling stream termination across an + inter-actor ``Channel``. This is the type returned to a local task + which invoked a remote streaming function using `Portal.run()`. + + Termination rules: + - if the local task signals stop iteration a cancel signal is + relayed to the remote task indicating to stop streaming + - if the remote task signals the end of a stream, raise a + ``StopAsyncIteration`` to terminate the local ``async for`` + + """ + def __init__( + self, + ctx: Context, + rx_chan: trio.abc.ReceiveChannel, + portal: 'Portal', # noqa + ) -> None: + self._ctx = ctx + self._rx_chan = rx_chan + self._portal = portal + # self._chan = portal.channel + self._shielded = False + + # delegate directly to underlying mem channel + def receive_nowait(self): + return self._rx_chan.receive_nowait() + + async def receive(self): + try: + msg = await self._rx_chan.receive() + return msg['yield'] + # return msg['yield'] + + except KeyError: + # internal error should never get here + assert msg.get('cid'), ("Received internal error at portal?") + + # TODO: handle 2 cases with 3.10 match syntax + # - 'stop' + # - 'error' + # possibly just handle msg['stop'] here! + + # TODO: test that shows stream raising an expected error!!! + if msg.get('error'): + # raise the error message + raise unpack_error(msg, self._portal.channel) + + except trio.ClosedResourceError: + # XXX: this indicates that a `stop` message was + # sent by the far side of the underlying channel. + # Currently this is triggered by calling ``.aclose()`` on + # the send side of the channel inside + # ``Actor._push_result()``, but maybe it should be put here? + # to avoid exposing the internal mem chan closing mechanism? + # in theory we could instead do some flushing of the channel + # if needed to ensure all consumers are complete before + # triggering closure too early? + + # Locally, we want to close this stream gracefully, by + # terminating any local consumers tasks deterministically. + # We **don't** want to be closing this send channel and not + # relaying a final value to remaining consumers who may not + # have been scheduled to receive it yet? + + # lots of testing to do here + + # when the send is closed we assume the stream has + # terminated and signal this local iterator to stop + await self.aclose() + raise StopAsyncIteration + + except trio.Cancelled: + # relay cancels to the remote task + await self.aclose() + raise + + @contextmanager + def shield( + self + ) -> Iterator['ReceiveStream']: # noqa + """Shield this stream's underlying channel such that a local consumer task + can be cancelled (and possibly restarted) using ``trio.Cancelled``. + + """ + self._shielded = True + yield self + self._shielded = False + + async def aclose(self): + """Cancel associated remote actor task and local memory channel + on close. + """ + rx_chan = self._rx_chan + + if rx_chan._closed: + log.warning(f"{self} is already closed") + return + + # stats = rx_chan.statistics() + # if stats.open_receive_channels > 1: + # # if we've been cloned don't kill the stream + # log.debug( + # "there are still consumers running keeping stream alive") + # return + + if self._shielded: + log.warning(f"{self} is shielded, portal channel being kept alive") + return + + # close the local mem chan + rx_chan.close() + + # cancel surrounding IPC context + await self._ctx.cancel() + + # TODO: but make it broadcasting to consumers + # def clone(self): + # """Clone this receive channel allowing for multi-task + # consumption from the same channel. + + # """ + # return ReceiveStream( + # self._cid, + # self._rx_chan.clone(), + # self._portal, + # ) + + +class MsgStream(ReceiveMsgStream, trio.abc.Channel): + """ + Bidirectional message stream for use within an inter-actor actor + ``Context```. + + """ + async def send( + self, + data: Any + ) -> None: + await self._chan.send({'yield': data, 'cid': self._cid}) From 897ab79946dbe122eaaf82ed91f3fc697410a12a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 11:07:53 -0400 Subject: [PATCH 02/14] Add a no runtime error --- tractor/_exceptions.py | 4 ++++ tractor/_state.py | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 63e0d09..6137590 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -46,6 +46,10 @@ class ModuleNotExposed(ModuleNotFoundError): "The requested module is not exposed for RPC" +class NoRuntime(RuntimeError): + "The root actor has not been initialized yet" + + def pack_error(exc: BaseException) -> Dict[str, Any]: """Create an "error message" for tranmission over a channel (aka the wire). diff --git a/tractor/_state.py b/tractor/_state.py index 37fdafa..1d5e2f3 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -7,6 +7,9 @@ import multiprocessing as mp import trio +from ._exceptions import NoRuntime + + _current_actor: Optional['Actor'] = None # type: ignore # noqa _runtime_vars: Dict[str, Any] = { '_debug_mode': False, @@ -19,7 +22,7 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore # """Get the process-local actor instance. """ if _current_actor is None and err_on_no_runtime: - raise RuntimeError("No local actor has been initialized yet") + raise NoRuntime("No local actor has been initialized yet") return _current_actor From 81f35584946a520f4255eeb9460d0c53a33b8125 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 11:08:20 -0400 Subject: [PATCH 03/14] Formatting --- tractor/_ipc.py | 1 - tractor/msg.py | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 32b8966..9d34b3a 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -4,7 +4,6 @@ Inter-process comms abstractions import typing from typing import Any, Tuple, Optional from functools import partial -import inspect import msgpack import trio diff --git a/tractor/msg.py b/tractor/msg.py index 5b343b6..560e644 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -29,9 +29,13 @@ async def fan_out_to_ctxs( return tuple(topics2ctxs.keys()) agen = pub_async_gen_func(get_topics=get_topics) + async with aclosing(agen) as pub_gen: + async for published in pub_gen: + ctx_payloads: Dict[str, Any] = {} + for topic, data in published.items(): log.debug(f"publishing {topic, data}") # build a new dict packet or invoke provided packetizer From 36251357b30e3361af4268a23a2be03930c1ff3f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 23:08:45 -0400 Subject: [PATCH 04/14] Add a new one-way stream API NB: this is a breaking change removing support for `Portal.run()` being able to invoke remote streaming functions and instead replacing the method call with an async context manager api `Portal.open_stream_from()` This style explicitly defines stream teardown at the call site instead of expecting the user to handle tricky things correctly themselves: eg. `async_geneartor.aclosing()`. Going forward `Portal.run()` can be used only for invoking async functions. --- tractor/_portal.py | 162 +++++++++++++++++++++++++-------------------- 1 file changed, 91 insertions(+), 71 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index e1b37fe..d2db1f0 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -3,8 +3,10 @@ Portal api """ import importlib import inspect -import typing -from typing import Tuple, Any, Dict, Optional, Set +from typing import ( + Tuple, Any, Dict, Optional, Set, + Callable, AsyncGenerator +) from functools import partial from dataclasses import dataclass import warnings @@ -26,7 +28,7 @@ log = get_logger(__name__) async def maybe_open_nursery( nursery: trio.Nursery = None, shield: bool = False, -) -> typing.AsyncGenerator[trio.Nursery, Any]: +) -> AsyncGenerator[trio.Nursery, Any]: """Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. @@ -39,6 +41,13 @@ async def maybe_open_nursery( yield nursery +def func_deats(func: Callable) -> Tuple[str, str]: + return ( + func.__module__, + func.__name__, + ) + + class Portal: """A 'portal' to a(n) (remote) ``Actor``. @@ -105,57 +114,22 @@ class Portal: "A pending main result has already been submitted" self._expect_result = await self._submit(ns, func, kwargs) - async def run_from_ns( - self, - namespace_path: str, - function_name: str, - **kwargs, - ) -> Any: - """Run a function from a (remote) namespace in a new task on the far-end actor. - - This is a more explitcit way to run tasks in a remote-process - actor using explicit object-path syntax. Hint: this is how - `.run()` works underneath. - - Note:: - - A special namespace `self` can be used to invoke `Actor` - instance methods in the remote runtime. Currently this should only - be used for `tractor` internals. - """ - return await self._return_from_resptype( - *(await self._submit(namespace_path, function_name, kwargs)) - ) - - async def _return_from_resptype( + async def _return_once( self, cid: str, recv_chan: trio.abc.ReceiveChannel, resptype: str, first_msg: dict ) -> Any: + assert resptype == 'asyncfunc' # single response - # receive only stream - if resptype == 'asyncgen': - ctx = Context(self.channel, cid, _portal=self) - rchan = ReceiveMsgStream(ctx, recv_chan, self) - self._streams.add(rchan) - return rchan - - elif resptype == 'context': # context manager style setup/teardown - # TODO likely not here though - raise NotImplementedError - - elif resptype == 'asyncfunc': # single response - msg = await recv_chan.receive() - try: - return msg['return'] - except KeyError: - # internal error should never get here - assert msg.get('cid'), "Received internal error at portal?" - raise unpack_error(msg, self.channel) - else: - raise ValueError(f"Unknown msg response type: {first_msg}") + msg = await recv_chan.receive() + try: + return msg['return'] + except KeyError: + # internal error should never get here + assert msg.get('cid'), "Received internal error at portal?" + raise unpack_error(msg, self.channel) async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. @@ -178,9 +152,7 @@ class Portal: assert self._expect_result if self._result is None: try: - self._result = await self._return_from_resptype( - *self._expect_result - ) + self._result = await self._return_once(*self._expect_result) except RemoteActorError as err: self._result = err @@ -247,6 +219,28 @@ class Portal: f"{self.channel} for {self.channel.uid} was already closed?") return False + async def run_from_ns( + self, + namespace_path: str, + function_name: str, + **kwargs, + ) -> Any: + """Run a function from a (remote) namespace in a new task on the far-end actor. + + This is a more explitcit way to run tasks in a remote-process + actor using explicit object-path syntax. Hint: this is how + `.run()` works underneath. + + Note:: + + A special namespace `self` can be used to invoke `Actor` + instance methods in the remote runtime. Currently this should only + be used for `tractor` internals. + """ + return await self._return_once( + *(await self._submit(namespace_path, function_name, kwargs)) + ) + async def run( self, func: str, @@ -272,30 +266,57 @@ class Portal: assert isinstance(fn_name, str) else: # function reference was passed directly - - # TODO: ensure async if not ( - inspect.isasyncgenfunction(func) or inspect.iscoroutinefunction(func) ): raise TypeError(f'{func} must be an async function!') - fn = func - fn_mod_path = fn.__module__ - fn_name = fn.__name__ + fn_mod_path, fn_name = func_deats(func) - return await self._return_from_resptype( + return await self._return_once( *(await self._submit(fn_mod_path, fn_name, kwargs)) ) - # @asynccontextmanager - # async def open_stream_from( - # self, - # async_gen: 'AsyncGeneratorFunction', - # **kwargs, - # ) -> ReceiveMsgStream: - # # TODO - # pass + @asynccontextmanager + async def open_stream_from( + self, + async_gen_func: Callable, # typing: ignore + **kwargs, + ) -> AsyncGenerator[ReceiveMsgStream, None]: + + if not inspect.isasyncgenfunction(async_gen_func): + if not inspect.iscoroutinefunction(async_gen_func) or ( + not getattr(async_gen_func, '_tractor_stream_function', False) + ): + raise TypeError( + f'{async_gen_func} must be an async generator function!') + + fn_mod_path, fn_name = func_deats(async_gen_func) + ( + cid, + recv_chan, + functype, + first_msg + ) = await self._submit(fn_mod_path, fn_name, kwargs) + + # receive only stream + assert functype == 'asyncgen' + + ctx = Context(self.channel, cid, _portal=self) + try: + async with ReceiveMsgStream(ctx, recv_chan, self) as rchan: + self._streams.add(rchan) + yield rchan + finally: + # cancel the far end task on consumer close + try: + await ctx.cancel() + except trio.ClosedResourceError: + # if the far end terminates before we send a cancel the + # underlying transport-channel may already be closed. + log.debug(f'Context {ctx} was already closed?') + + self._streams.remove(rchan) # @asynccontextmanager # async def open_context( @@ -304,7 +325,9 @@ class Portal: # **kwargs, # ) -> Context: # # TODO - # pass + # elif resptype == 'context': # context manager style setup/teardown + # # TODO likely not here though + # raise NotImplementedError @dataclass @@ -324,10 +347,7 @@ class LocalPortal: """ obj = self.actor if ns == 'self' else importlib.import_module(ns) func = getattr(obj, func_name) - if inspect.iscoroutinefunction(func): - return await func(**kwargs) - else: - return func(**kwargs) + return await func(**kwargs) @asynccontextmanager @@ -336,7 +356,7 @@ async def open_portal( nursery: Optional[trio.Nursery] = None, start_msg_loop: bool = True, shield: bool = False, -) -> typing.AsyncGenerator[Portal, None]: +) -> AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. Spawns a background task to handle message processing. From 80c96cab01bb611f65687dd343b8fc72f096e1ce Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 08:39:50 -0400 Subject: [PATCH 05/14] Add a warning for soon to be deprecated `ctx` use in `@stream` func --- tractor/_streaming.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 512c432..8e8edf8 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -57,6 +57,9 @@ class Context: timeout quickly to sidestep 2-generals... """ + assert self._portal, ( + "No portal found, this is likely a callee side context") + cid = self.cid with trio.move_on_after(0.5) as cs: cs.shield = True @@ -95,7 +98,20 @@ def stream(func): """ func._tractor_stream_function = True sig = inspect.signature(func) - if 'ctx' not in sig.parameters: + params = sig.parameters + if 'stream' not in params and 'ctx' in params: + warnings.warn( + "`@tractr.stream decorated funcs should now declare a `stream` " + " arg, `ctx` is now designated for use with @tractor.context", + DeprecationWarning, + stacklevel=2, + ) + + if ( + 'ctx' not in params and + 'to_trio' not in params and + 'stream' not in params + ): raise TypeError( "The first argument to the stream function " f"{func.__name__} must be `ctx: tractor.Context`" @@ -120,7 +136,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: Context, rx_chan: trio.abc.ReceiveChannel, - portal: 'Portal', # noqa + portal: 'Portal', # type: ignore # noqa ) -> None: self._ctx = ctx self._rx_chan = rx_chan @@ -152,7 +168,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # raise the error message raise unpack_error(msg, self._portal.channel) - except trio.ClosedResourceError: + except (trio.ClosedResourceError, StopAsyncIteration): # XXX: this indicates that a `stop` message was # sent by the far side of the underlying channel. # Currently this is triggered by calling ``.aclose()`` on @@ -184,7 +200,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): @contextmanager def shield( self - ) -> Iterator['ReceiveStream']: # noqa + ) -> Iterator['ReceiveMsgStream']: # noqa """Shield this stream's underlying channel such that a local consumer task can be cancelled (and possibly restarted) using ``trio.Cancelled``. @@ -243,4 +259,4 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): self, data: Any ) -> None: - await self._chan.send({'yield': data, 'cid': self._cid}) + await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) From 3e19fd311bb8589bea1cce0edd0bf405b011d87b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 08:40:22 -0400 Subject: [PATCH 06/14] Move debugger locking to new stream api --- tractor/_debug.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 6040d3f..75e502a 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -182,24 +182,22 @@ def _breakpoint(debug_func) -> Awaitable[None]: _debugger_request_cs = cs try: async with get_root() as portal: - with trio.fail_after(.5): - stream = await portal.run( + async with portal.open_stream_from( tractor._debug._hijack_stdin_relay_to_child, subactor_uid=actor.uid, - ) - async with aclosing(stream): + ) as stream: - # block until first yield above - async for val in stream: + # block until first yield above + async for val in stream: - assert val == 'Locked' - task_status.started() + assert val == 'Locked' + task_status.started() - # with trio.CancelScope(shield=True): - await do_unlock.wait() + # with trio.CancelScope(shield=True): + await do_unlock.wait() - # trigger cancellation of remote stream - break + # trigger cancellation of remote stream + break finally: log.debug(f"Exiting debugger for actor {actor}") global _in_debug From ad9256bcdbdfb9979e867a5cebba65186e3e1cd9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 11:38:10 -0400 Subject: [PATCH 07/14] Drop stream exhaustion; no longer needed --- tractor/_spawn.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index f8528ac..a07f60c 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -98,17 +98,11 @@ async def exhaust_portal( """ try: log.debug(f"Waiting on final result from {actor.uid}") - final = res = await portal.result() - # if it's an async-gen then alert that we're cancelling it - if inspect.isasyncgen(res): - final = [] - log.warning( - f"Blindly consuming asyncgen for {actor.uid}") - with trio.fail_after(1): - async with aclosing(res) as agen: - async for item in agen: - log.debug(f"Consuming item {item}") - final.append(item) + + # XXX: streams should never be reaped here since they should + # always be established and shutdown using a context manager api + final = await portal.result() + except (Exception, trio.MultiError) as err: # we reraise in the parent task via a ``trio.MultiError`` return err From 83af295b450155b861c06e04ea71cd1087fed178 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 11:38:31 -0400 Subject: [PATCH 08/14] Fix func type checking --- tractor/_portal.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index d2db1f0..d82e040 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -110,8 +110,10 @@ class Portal: return cid, recv_chan, functype, first_msg async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: + assert self._expect_result is None, \ "A pending main result has already been submitted" + self._expect_result = await self._submit(ns, func, kwargs) async def _return_once( @@ -266,10 +268,15 @@ class Portal: assert isinstance(fn_name, str) else: # function reference was passed directly - if not ( - inspect.iscoroutinefunction(func) + if ( + not inspect.iscoroutinefunction(func) or + ( + inspect.iscoroutinefunction(func) and + getattr(func, '_tractor_stream_function', False) + ) ): - raise TypeError(f'{func} must be an async function!') + raise TypeError( + f'{func} must be a non-streaming async function!') fn_mod_path, fn_name = func_deats(func) @@ -285,8 +292,9 @@ class Portal: ) -> AsyncGenerator[ReceiveMsgStream, None]: if not inspect.isasyncgenfunction(async_gen_func): - if not inspect.iscoroutinefunction(async_gen_func) or ( - not getattr(async_gen_func, '_tractor_stream_function', False) + if not ( + inspect.iscoroutinefunction(async_gen_func) and + getattr(async_gen_func, '_tractor_stream_function', False) ): raise TypeError( f'{async_gen_func} must be an async generator function!') From 86fc418050cefcfb0dff0bbdb0226059101acc76 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 11:38:55 -0400 Subject: [PATCH 09/14] Error on bad registry pops --- tractor/_actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 6333091..8601e83 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -1093,4 +1093,4 @@ class Arbiter(Actor): event.set() async def unregister_actor(self, uid: Tuple[str, str]) -> None: - self._registry.pop(uid, None) + self._registry.pop(uid) From f59346d854f2284e81a0562a87d29b54a0bbb967 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 11:39:26 -0400 Subject: [PATCH 10/14] Add func type checking to `.run_in_actor()` --- tractor/_trionics.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 8815562..4674bd9 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -2,6 +2,7 @@ ``trio`` inspired apis and helpers """ from functools import partial +import inspect import multiprocessing as mp from typing import Tuple, List, Dict, Optional import typing @@ -136,6 +137,14 @@ class ActorNursery: # use the run_in_actor nursery nursery=self._ria_nursery, ) + + # XXX: don't allow stream funcs + if not ( + inspect.iscoroutinefunction(fn) and + not getattr(fn, '_tractor_stream_function', False) + ): + raise TypeError(f'{fn} must be an async function!') + # this marks the actor to be cancelled after its portal result # is retreived, see logic in `open_nursery()` below. self._cancel_after_result_on_exit.add(portal) From 5a5e6baad1e8396d51038e9fbf0103986aeec912 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 11:54:56 -0400 Subject: [PATCH 11/14] Update all examples to new streaming API --- examples/asynchronous_generators.py | 5 +-- examples/debugging/multi_daemon_subactors.py | 4 +-- examples/debugging/subactor_breakpoint.py | 2 +- examples/full_fledged_streaming_service.py | 33 +++++++++++++------- examples/multiple_streams_one_portal.py | 11 ++++--- 5 files changed, 33 insertions(+), 22 deletions(-) diff --git a/examples/asynchronous_generators.py b/examples/asynchronous_generators.py index 102b29a..47ee136 100644 --- a/examples/asynchronous_generators.py +++ b/examples/asynchronous_generators.py @@ -24,8 +24,9 @@ async def main(): # this async for loop streams values from the above # async generator running in a separate process - async for letter in await portal.run(stream_forever): - print(letter) + async with portal.open_stream_from(stream_forever) as stream: + async for letter in stream: + print(letter) # we support trio's cancellation system assert cancel_scope.cancelled_caught diff --git a/examples/debugging/multi_daemon_subactors.py b/examples/debugging/multi_daemon_subactors.py index eadb4c1..c37b879 100644 --- a/examples/debugging/multi_daemon_subactors.py +++ b/examples/debugging/multi_daemon_subactors.py @@ -26,8 +26,8 @@ async def main(): p1 = await n.start_actor('name_error', enable_modules=[__name__]) # retreive results - stream = await p0.run(breakpoint_forever) - await p1.run(name_error) + async with p0.open_stream_from(breakpoint_forever) as stream: + await p1.run(name_error) if __name__ == '__main__': diff --git a/examples/debugging/subactor_breakpoint.py b/examples/debugging/subactor_breakpoint.py index cb16004..d880404 100644 --- a/examples/debugging/subactor_breakpoint.py +++ b/examples/debugging/subactor_breakpoint.py @@ -21,4 +21,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, debug_mode=True) + tractor.run(main, debug_mode=True, loglevel='debug') diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index 0ed5f66..51978c0 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -21,7 +21,7 @@ async def aggregate(seed): # fork point portal = await nursery.start_actor( name=f'streamer_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) portals.append(portal) @@ -29,8 +29,11 @@ async def aggregate(seed): send_chan, recv_chan = trio.open_memory_channel(500) async def push_to_chan(portal, send_chan): - async with send_chan: - async for value in await portal.run(stream_data, seed=seed): + async with ( + send_chan, + portal.open_stream_from(stream_data, seed=seed) as stream, + ): + async for value in stream: # leverage trio's built-in backpressure await send_chan.send(value) @@ -71,18 +74,24 @@ async def main(): import time pre_start = time.time() - portal = await nursery.run_in_actor( - aggregate, + portal = await nursery.start_actor( name='aggregator', - seed=seed, + enable_modules=[__name__], ) - start = time.time() - # the portal call returns exactly what you'd expect - # as if the remote "aggregate" function was called locally - result_stream = [] - async for value in await portal.result(): - result_stream.append(value) + async with portal.open_stream_from( + aggregate, + seed=seed, + ) as stream: + + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "aggregate" function was called locally + result_stream = [] + async for value in stream: + result_stream.append(value) + + await portal.cancel_actor() print(f"STREAM TIME = {time.time() - start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") diff --git a/examples/multiple_streams_one_portal.py b/examples/multiple_streams_one_portal.py index 354ec66..3e592a4 100644 --- a/examples/multiple_streams_one_portal.py +++ b/examples/multiple_streams_one_portal.py @@ -15,11 +15,12 @@ async def stream_data(seed=10): async def stream_from_portal(p, consumed): - async for item in await p.run(stream_data): - if item in consumed: - consumed.remove(item) - else: - consumed.append(item) + async with p.open_stream_from(stream_data) as stream: + async for item in stream: + if item in consumed: + consumed.remove(item) + else: + consumed.append(item) async def main(): From 2498a4963bbc567e12f2878477060d65d830fb5a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 11:55:37 -0400 Subject: [PATCH 12/14] Update all tests to new streaming API --- tests/test_cancellation.py | 16 +++-- tests/test_discovery.py | 91 ++++++++++++------------ tests/test_pubsub.py | 82 +++++++++++---------- tests/test_streaming.py | 141 ++++++++++++++++++++----------------- 4 files changed, 178 insertions(+), 152 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 42aec35..eadcb44 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -49,7 +49,9 @@ def test_remote_error(arb_addr, args_err): async def main(): async with tractor.open_nursery() as nursery: - portal = await nursery.run_in_actor(assert_err, name='errorer', **args) + portal = await nursery.run_in_actor( + assert_err, name='errorer', **args + ) # get result(s) from main task try: @@ -168,13 +170,14 @@ async def test_cancel_infinite_streamer(start_method): async with tractor.open_nursery() as n: portal = await n.start_actor( 'donny', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) # this async for loop streams values from the above # async generator running in a separate process - async for letter in await portal.run(stream_forever): - print(letter) + async with portal.open_stream_from(stream_forever) as stream: + async for letter in stream: + print(letter) # we support trio's cancellation system assert cancel_scope.cancelled_caught @@ -430,7 +433,6 @@ def test_cancel_via_SIGINT_other_task( tractor.run(main) - async def spin_for(period=3): "Sync sleep." time.sleep(period) @@ -438,7 +440,7 @@ async def spin_for(period=3): async def spawn(): async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor( + await tn.run_in_actor( spin_for, name='sleeper', ) @@ -460,7 +462,7 @@ def test_cancel_while_childs_child_in_sync_sleep( async def main(): with trio.fail_after(2): async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor( + await tn.run_in_actor( spawn, name='spawn', ) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index eff2897..d1f8474 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -108,8 +108,9 @@ async def cancel(use_signal, delay=0): async def stream_from(portal): - async for value in await portal.result(): - print(value) + async with portal.open_stream_from(stream_forever) as stream: + async for value in stream: + print(value) async def spawn_and_check_registry( @@ -139,18 +140,20 @@ async def spawn_and_check_registry( registry = await get_reg() assert actor.uid in registry - if with_streaming: - to_run = stream_forever - else: - to_run = trio.sleep_forever + try: + async with tractor.open_nursery() as n: + async with trio.open_nursery() as trion: - async with trio.open_nursery() as trion: - try: - async with tractor.open_nursery() as n: portals = {} for i in range(3): name = f'a{i}' - portals[name] = await n.run_in_actor(to_run, name=name) + if with_streaming: + portals[name] = await n.start_actor( + name=name, enable_modules=[__name__]) + + else: # no streaming + portals[name] = await n.run_in_actor( + trio.sleep_forever, name=name) # wait on last actor to come up async with tractor.wait_for_actor(name): @@ -171,19 +174,19 @@ async def spawn_and_check_registry( trion.start_soon(cancel, use_signal, 1) last_p = pts[-1] - async for value in await last_p.result(): - print(value) + await stream_from(last_p) + else: await cancel(use_signal) - finally: - with trio.CancelScope(shield=True): - await trio.sleep(0.5) + finally: + with trio.CancelScope(shield=True): + await trio.sleep(0.5) - # all subactors should have de-registered - registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + # all subactors should have de-registered + registry = await get_reg() + assert len(registry) == extra + assert actor.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) @@ -260,36 +263,36 @@ async def close_chans_before_nursery( get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') async with tractor.open_nursery() as tn: - portal1 = await tn.run_in_actor( - stream_forever, - name='consumer1', - ) - agen1 = await portal1.result() + portal1 = await tn.start_actor( + name='consumer1', enable_modules=[__name__]) + portal2 = await tn.start_actor( + 'consumer2', enable_modules=[__name__]) - portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) - agen2 = await portal2.run(stream_forever) + async with ( + portal1.open_stream_from(stream_forever) as agen1, + portal2.open_stream_from(stream_forever) as agen2, + ): + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - # tractor.current_actor()._root_nursery.cancel_scope.cancel() + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block - - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() finally: with trio.CancelScope(shield=True): - await trio.sleep(.5) + await trio.sleep(1) # all subactors should have de-registered registry = await get_reg() diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 3fcb45d..48e65b2 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -61,44 +61,47 @@ async def subs( return isinstance(i, int) async with tractor.find_actor(pub_actor_name) as portal: - stream = await portal.run( - pubber, - topics=which, - seed=seed, - ) - task_status.started(stream) - times = 10 - count = 0 - await stream.__anext__() - async for pkt in stream: - for topic, value in pkt.items(): - assert pred(value) - count += 1 - if count >= times: - break - - await stream.aclose() - - stream = await portal.run( - pubber, - topics=['odd'], - seed=seed, - ) - - await stream.__anext__() - count = 0 - # async with aclosing(stream) as stream: - try: + async with ( + portal.open_stream_from( + pubber, + topics=which, + seed=seed, + ) as stream + ): + task_status.started(stream) + times = 10 + count = 0 + await stream.__anext__() async for pkt in stream: for topic, value in pkt.items(): - pass - # assert pred(value) + assert pred(value) count += 1 if count >= times: break - finally: + await stream.aclose() + async with ( + portal.open_stream_from( + pubber, + topics=['odd'], + seed=seed, + ) as stream + ): + await stream.__anext__() + count = 0 + # async with aclosing(stream) as stream: + try: + async for pkt in stream: + for topic, value in pkt.items(): + pass + # assert pred(value) + count += 1 + if count >= times: + break + finally: + await stream.aclose() + @tractor.msg.pub(tasks=['one', 'two']) async def multilock_pubber(get_topics): @@ -128,11 +131,10 @@ async def test_required_args(callwith_expecterror): await func(**kwargs) else: async with tractor.open_nursery() as n: - # await func(**kwargs) - portal = await n.run_in_actor( - multilock_pubber, + + portal = await n.start_actor( name='pubber', - **kwargs + enable_modules=[__name__], ) async with tractor.wait_for_actor('pubber'): @@ -140,8 +142,14 @@ async def test_required_args(callwith_expecterror): await trio.sleep(0.5) - async for val in await portal.result(): - assert val == {'doggy': 10} + async with portal.open_stream_from( + multilock_pubber, + **kwargs + ) as stream: + async for val in stream: + assert val == {'doggy': 10} + + await portal.cancel_actor() @pytest.mark.parametrize( diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 45fbd5b..9aba327 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -61,37 +61,38 @@ async def stream_from_single_subactor(stream_func): # no brokerd actor found portal = await nursery.start_actor( 'streamerd', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) seq = range(10) - stream = await portal.run( - stream_func, # one of the funcs above + async with portal.open_stream_from( + stream_func, sequence=list(seq), # has to be msgpack serializable - ) - # it'd sure be nice to have an asyncitertools here... - iseq = iter(seq) - ival = next(iseq) + ) as stream: - async for val in stream: - assert val == ival + # it'd sure be nice to have an asyncitertools here... + iseq = iter(seq) + ival = next(iseq) + + async for val in stream: + assert val == ival + + try: + ival = next(iseq) + except StopIteration: + # should cancel far end task which will be + # caught and no error is raised + await stream.aclose() + + await trio.sleep(0.3) try: - ival = next(iseq) - except StopIteration: - # should cancel far end task which will be - # caught and no error is raised - await stream.aclose() - - await trio.sleep(0.3) - - try: - await stream.__anext__() - except StopAsyncIteration: - # stop all spawned subactors - await portal.cancel_actor() - # await nursery.cancel() + await stream.__anext__() + except StopAsyncIteration: + # stop all spawned subactors + await portal.cancel_actor() + # await nursery.cancel() @pytest.mark.parametrize( @@ -132,7 +133,7 @@ async def aggregate(seed): # fork point portal = await nursery.start_actor( name=f'streamer_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) portals.append(portal) @@ -141,11 +142,14 @@ async def aggregate(seed): async def push_to_chan(portal, send_chan): async with send_chan: - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): - # leverage trio's built-in backpressure - await send_chan.send(value) + + async with portal.open_stream_from( + stream_data, seed=seed, + ) as stream: + + async for value in stream: + # leverage trio's built-in backpressure + await send_chan.send(value) print(f"FINISHED ITERATING {portal.channel.uid}") @@ -183,22 +187,24 @@ async def a_quadruple_example(): seed = int(1e3) pre_start = time.time() - portal = await nursery.run_in_actor( - aggregate, - seed=seed, + portal = await nursery.start_actor( name='aggregator', + enable_modules=[__name__], ) start = time.time() # the portal call returns exactly what you'd expect # as if the remote "aggregate" function was called locally result_stream = [] - async for value in await portal.result(): - result_stream.append(value) + + async with portal.open_stream_from(aggregate, seed=seed) as stream: + async for value in stream: + result_stream.append(value) print(f"STREAM TIME = {time.time() - start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") assert result_stream == list(range(seed)) + await portal.cancel_actor() return result_stream @@ -272,48 +278,55 @@ async def test_respawn_consumer_task( async with tractor.open_nursery() as n: - stream = await(await n.run_in_actor( + portal = await n.start_actor( + name='streamer', + enable_modules=[__name__] + ) + async with portal.open_stream_from( stream_data, seed=11, - name='streamer', - )).result() + ) as stream: - expect = set(range(11)) - received = [] + expect = set(range(11)) + received = [] - # this is the re-spawn task routine - async def consume(task_status=trio.TASK_STATUS_IGNORED): - print('starting consume task..') - nonlocal stream + # this is the re-spawn task routine + async def consume(task_status=trio.TASK_STATUS_IGNORED): + print('starting consume task..') + nonlocal stream - with trio.CancelScope() as cs: - task_status.started(cs) + with trio.CancelScope() as cs: + task_status.started(cs) - # shield stream's underlying channel from cancellation - with stream.shield(): + # shield stream's underlying channel from cancellation + with stream.shield(): - async for v in stream: - print(f'from stream: {v}') - expect.remove(v) - received.append(v) + async for v in stream: + print(f'from stream: {v}') + expect.remove(v) + received.append(v) - print('exited consume') + print('exited consume') - async with trio.open_nursery() as ln: - cs = await ln.start(consume) + async with trio.open_nursery() as ln: + cs = await ln.start(consume) - while True: + while True: - await trio.sleep(0.1) + await trio.sleep(0.1) - if received[-1] % 2 == 0: + if received[-1] % 2 == 0: - print('cancelling consume task..') - cs.cancel() + print('cancelling consume task..') + cs.cancel() - # respawn - cs = await ln.start(consume) + # respawn + cs = await ln.start(consume) - if not expect: - print("all values streamed, BREAKING") - break + if not expect: + print("all values streamed, BREAKING") + break + + # TODO: this is justification for a + # ``ActorNursery.stream_from_actor()`` helper? + await portal.cancel_actor() From b1f657e24654bbfedb329b7ba61548e906813037 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 14:35:25 -0400 Subject: [PATCH 13/14] De-compact async with tuple on 3.8- Turns out can't use the nicer syntax before python 3.9 (even though it doesn't seem documented anywhere?). Relates to #207 --- examples/full_fledged_streaming_service.py | 14 ++++---- tests/test_discovery.py | 42 +++++++++++----------- tests/test_pubsub.py | 25 ++++++------- 3 files changed, 40 insertions(+), 41 deletions(-) diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index 51978c0..126eed9 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -29,13 +29,13 @@ async def aggregate(seed): send_chan, recv_chan = trio.open_memory_channel(500) async def push_to_chan(portal, send_chan): - async with ( - send_chan, - portal.open_stream_from(stream_data, seed=seed) as stream, - ): - async for value in stream: - # leverage trio's built-in backpressure - await send_chan.send(value) + + # TODO: https://github.com/goodboy/tractor/issues/207 + async with send_chan: + async with portal.open_stream_from(stream_data, seed=seed) as stream: + async for value in stream: + # leverage trio's built-in backpressure + await send_chan.send(value) print(f"FINISHED ITERATING {portal.channel.uid}") diff --git a/tests/test_discovery.py b/tests/test_discovery.py index d1f8474..af03ce6 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -268,28 +268,30 @@ async def close_chans_before_nursery( portal2 = await tn.start_actor( 'consumer2', enable_modules=[__name__]) - async with ( - portal1.open_stream_from(stream_forever) as agen1, - portal2.open_stream_from(stream_forever) as agen2, - ): - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - # tractor.current_actor()._root_nursery.cancel_scope.cancel() + # TODO: compact this back as was in last commit once + # 3.9+, see https://github.com/goodboy/tractor/issues/207 + async with portal1.open_stream_from(stream_forever) as agen1: + async with portal2.open_stream_from( + stream_forever + ) as agen2: + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() finally: with trio.CancelScope(shield=True): await trio.sleep(1) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 48e65b2..0d4c62d 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -60,14 +60,13 @@ async def subs( def pred(i): return isinstance(i, int) + # TODO: https://github.com/goodboy/tractor/issues/207 async with tractor.find_actor(pub_actor_name) as portal: - async with ( - portal.open_stream_from( - pubber, - topics=which, - seed=seed, - ) as stream - ): + async with portal.open_stream_from( + pubber, + topics=which, + seed=seed, + ) as stream: task_status.started(stream) times = 10 count = 0 @@ -81,13 +80,11 @@ async def subs( await stream.aclose() - async with ( - portal.open_stream_from( - pubber, - topics=['odd'], - seed=seed, - ) as stream - ): + async with portal.open_stream_from( + pubber, + topics=['odd'], + seed=seed, + ) as stream: await stream.__anext__() count = 0 # async with aclosing(stream) as stream: From fc36e7362877cfe294baee16bf1a1ea50ee4926c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 16:40:38 -0400 Subject: [PATCH 14/14] Comment out `MsgStream` for now --- tractor/_streaming.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 8e8edf8..0836f4e 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -101,7 +101,7 @@ def stream(func): params = sig.parameters if 'stream' not in params and 'ctx' in params: warnings.warn( - "`@tractr.stream decorated funcs should now declare a `stream` " + "`@tractor.stream decorated funcs should now declare a `stream` " " arg, `ctx` is now designated for use with @tractor.context", DeprecationWarning, stacklevel=2, @@ -141,7 +141,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self._ctx = ctx self._rx_chan = rx_chan self._portal = portal - # self._chan = portal.channel self._shielded = False # delegate directly to underlying mem channel @@ -152,7 +151,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): try: msg = await self._rx_chan.receive() return msg['yield'] - # return msg['yield'] except KeyError: # internal error should never get here @@ -249,14 +247,14 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # ) -class MsgStream(ReceiveMsgStream, trio.abc.Channel): - """ - Bidirectional message stream for use within an inter-actor actor - ``Context```. +# class MsgStream(ReceiveMsgStream, trio.abc.Channel): +# """ +# Bidirectional message stream for use within an inter-actor actor +# ``Context```. - """ - async def send( - self, - data: Any - ) -> None: - await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) +# """ +# async def send( +# self, +# data: Any +# ) -> None: +# await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})