From 36251357b30e3361af4268a23a2be03930c1ff3f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 23:08:45 -0400 Subject: [PATCH] Add a new one-way stream API NB: this is a breaking change removing support for `Portal.run()` being able to invoke remote streaming functions and instead replacing the method call with an async context manager api `Portal.open_stream_from()` This style explicitly defines stream teardown at the call site instead of expecting the user to handle tricky things correctly themselves: eg. `async_geneartor.aclosing()`. Going forward `Portal.run()` can be used only for invoking async functions. --- tractor/_portal.py | 162 +++++++++++++++++++++++++-------------------- 1 file changed, 91 insertions(+), 71 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index e1b37fe..d2db1f0 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -3,8 +3,10 @@ Portal api """ import importlib import inspect -import typing -from typing import Tuple, Any, Dict, Optional, Set +from typing import ( + Tuple, Any, Dict, Optional, Set, + Callable, AsyncGenerator +) from functools import partial from dataclasses import dataclass import warnings @@ -26,7 +28,7 @@ log = get_logger(__name__) async def maybe_open_nursery( nursery: trio.Nursery = None, shield: bool = False, -) -> typing.AsyncGenerator[trio.Nursery, Any]: +) -> AsyncGenerator[trio.Nursery, Any]: """Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. @@ -39,6 +41,13 @@ async def maybe_open_nursery( yield nursery +def func_deats(func: Callable) -> Tuple[str, str]: + return ( + func.__module__, + func.__name__, + ) + + class Portal: """A 'portal' to a(n) (remote) ``Actor``. @@ -105,57 +114,22 @@ class Portal: "A pending main result has already been submitted" self._expect_result = await self._submit(ns, func, kwargs) - async def run_from_ns( - self, - namespace_path: str, - function_name: str, - **kwargs, - ) -> Any: - """Run a function from a (remote) namespace in a new task on the far-end actor. - - This is a more explitcit way to run tasks in a remote-process - actor using explicit object-path syntax. Hint: this is how - `.run()` works underneath. - - Note:: - - A special namespace `self` can be used to invoke `Actor` - instance methods in the remote runtime. Currently this should only - be used for `tractor` internals. - """ - return await self._return_from_resptype( - *(await self._submit(namespace_path, function_name, kwargs)) - ) - - async def _return_from_resptype( + async def _return_once( self, cid: str, recv_chan: trio.abc.ReceiveChannel, resptype: str, first_msg: dict ) -> Any: + assert resptype == 'asyncfunc' # single response - # receive only stream - if resptype == 'asyncgen': - ctx = Context(self.channel, cid, _portal=self) - rchan = ReceiveMsgStream(ctx, recv_chan, self) - self._streams.add(rchan) - return rchan - - elif resptype == 'context': # context manager style setup/teardown - # TODO likely not here though - raise NotImplementedError - - elif resptype == 'asyncfunc': # single response - msg = await recv_chan.receive() - try: - return msg['return'] - except KeyError: - # internal error should never get here - assert msg.get('cid'), "Received internal error at portal?" - raise unpack_error(msg, self.channel) - else: - raise ValueError(f"Unknown msg response type: {first_msg}") + msg = await recv_chan.receive() + try: + return msg['return'] + except KeyError: + # internal error should never get here + assert msg.get('cid'), "Received internal error at portal?" + raise unpack_error(msg, self.channel) async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. @@ -178,9 +152,7 @@ class Portal: assert self._expect_result if self._result is None: try: - self._result = await self._return_from_resptype( - *self._expect_result - ) + self._result = await self._return_once(*self._expect_result) except RemoteActorError as err: self._result = err @@ -247,6 +219,28 @@ class Portal: f"{self.channel} for {self.channel.uid} was already closed?") return False + async def run_from_ns( + self, + namespace_path: str, + function_name: str, + **kwargs, + ) -> Any: + """Run a function from a (remote) namespace in a new task on the far-end actor. + + This is a more explitcit way to run tasks in a remote-process + actor using explicit object-path syntax. Hint: this is how + `.run()` works underneath. + + Note:: + + A special namespace `self` can be used to invoke `Actor` + instance methods in the remote runtime. Currently this should only + be used for `tractor` internals. + """ + return await self._return_once( + *(await self._submit(namespace_path, function_name, kwargs)) + ) + async def run( self, func: str, @@ -272,30 +266,57 @@ class Portal: assert isinstance(fn_name, str) else: # function reference was passed directly - - # TODO: ensure async if not ( - inspect.isasyncgenfunction(func) or inspect.iscoroutinefunction(func) ): raise TypeError(f'{func} must be an async function!') - fn = func - fn_mod_path = fn.__module__ - fn_name = fn.__name__ + fn_mod_path, fn_name = func_deats(func) - return await self._return_from_resptype( + return await self._return_once( *(await self._submit(fn_mod_path, fn_name, kwargs)) ) - # @asynccontextmanager - # async def open_stream_from( - # self, - # async_gen: 'AsyncGeneratorFunction', - # **kwargs, - # ) -> ReceiveMsgStream: - # # TODO - # pass + @asynccontextmanager + async def open_stream_from( + self, + async_gen_func: Callable, # typing: ignore + **kwargs, + ) -> AsyncGenerator[ReceiveMsgStream, None]: + + if not inspect.isasyncgenfunction(async_gen_func): + if not inspect.iscoroutinefunction(async_gen_func) or ( + not getattr(async_gen_func, '_tractor_stream_function', False) + ): + raise TypeError( + f'{async_gen_func} must be an async generator function!') + + fn_mod_path, fn_name = func_deats(async_gen_func) + ( + cid, + recv_chan, + functype, + first_msg + ) = await self._submit(fn_mod_path, fn_name, kwargs) + + # receive only stream + assert functype == 'asyncgen' + + ctx = Context(self.channel, cid, _portal=self) + try: + async with ReceiveMsgStream(ctx, recv_chan, self) as rchan: + self._streams.add(rchan) + yield rchan + finally: + # cancel the far end task on consumer close + try: + await ctx.cancel() + except trio.ClosedResourceError: + # if the far end terminates before we send a cancel the + # underlying transport-channel may already be closed. + log.debug(f'Context {ctx} was already closed?') + + self._streams.remove(rchan) # @asynccontextmanager # async def open_context( @@ -304,7 +325,9 @@ class Portal: # **kwargs, # ) -> Context: # # TODO - # pass + # elif resptype == 'context': # context manager style setup/teardown + # # TODO likely not here though + # raise NotImplementedError @dataclass @@ -324,10 +347,7 @@ class LocalPortal: """ obj = self.actor if ns == 'self' else importlib.import_module(ns) func = getattr(obj, func_name) - if inspect.iscoroutinefunction(func): - return await func(**kwargs) - else: - return func(**kwargs) + return await func(**kwargs) @asynccontextmanager @@ -336,7 +356,7 @@ async def open_portal( nursery: Optional[trio.Nursery] = None, start_msg_loop: bool = True, shield: bool = False, -) -> typing.AsyncGenerator[Portal, None]: +) -> AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. Spawns a background task to handle message processing.