Add a new one-way stream API

NB: this is a breaking change removing support for `Portal.run()` being
able to invoke remote streaming functions and instead replacing the
method call with an async context manager api `Portal.open_stream_from()`
This style explicitly defines stream teardown at the call site instead
of expecting the user to handle tricky things correctly themselves: eg.
`async_geneartor.aclosing()`. Going forward `Portal.run()` can be used
only for invoking async functions.
stream_contexts
Tyler Goodlet 2021-04-27 23:08:45 -04:00
parent 81f3558494
commit 36251357b3
1 changed files with 91 additions and 71 deletions

View File

@ -3,8 +3,10 @@ Portal api
"""
import importlib
import inspect
import typing
from typing import Tuple, Any, Dict, Optional, Set
from typing import (
Tuple, Any, Dict, Optional, Set,
Callable, AsyncGenerator
)
from functools import partial
from dataclasses import dataclass
import warnings
@ -26,7 +28,7 @@ log = get_logger(__name__)
async def maybe_open_nursery(
nursery: trio.Nursery = None,
shield: bool = False,
) -> typing.AsyncGenerator[trio.Nursery, Any]:
) -> AsyncGenerator[trio.Nursery, Any]:
"""Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
@ -39,6 +41,13 @@ async def maybe_open_nursery(
yield nursery
def func_deats(func: Callable) -> Tuple[str, str]:
return (
func.__module__,
func.__name__,
)
class Portal:
"""A 'portal' to a(n) (remote) ``Actor``.
@ -105,48 +114,15 @@ class Portal:
"A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, kwargs)
async def run_from_ns(
self,
namespace_path: str,
function_name: str,
**kwargs,
) -> Any:
"""Run a function from a (remote) namespace in a new task on the far-end actor.
This is a more explitcit way to run tasks in a remote-process
actor using explicit object-path syntax. Hint: this is how
`.run()` works underneath.
Note::
A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this should only
be used for `tractor` internals.
"""
return await self._return_from_resptype(
*(await self._submit(namespace_path, function_name, kwargs))
)
async def _return_from_resptype(
async def _return_once(
self,
cid: str,
recv_chan: trio.abc.ReceiveChannel,
resptype: str,
first_msg: dict
) -> Any:
assert resptype == 'asyncfunc' # single response
# receive only stream
if resptype == 'asyncgen':
ctx = Context(self.channel, cid, _portal=self)
rchan = ReceiveMsgStream(ctx, recv_chan, self)
self._streams.add(rchan)
return rchan
elif resptype == 'context': # context manager style setup/teardown
# TODO likely not here though
raise NotImplementedError
elif resptype == 'asyncfunc': # single response
msg = await recv_chan.receive()
try:
return msg['return']
@ -154,8 +130,6 @@ class Portal:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, self.channel)
else:
raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task.
@ -178,9 +152,7 @@ class Portal:
assert self._expect_result
if self._result is None:
try:
self._result = await self._return_from_resptype(
*self._expect_result
)
self._result = await self._return_once(*self._expect_result)
except RemoteActorError as err:
self._result = err
@ -247,6 +219,28 @@ class Portal:
f"{self.channel} for {self.channel.uid} was already closed?")
return False
async def run_from_ns(
self,
namespace_path: str,
function_name: str,
**kwargs,
) -> Any:
"""Run a function from a (remote) namespace in a new task on the far-end actor.
This is a more explitcit way to run tasks in a remote-process
actor using explicit object-path syntax. Hint: this is how
`.run()` works underneath.
Note::
A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this should only
be used for `tractor` internals.
"""
return await self._return_once(
*(await self._submit(namespace_path, function_name, kwargs))
)
async def run(
self,
func: str,
@ -272,30 +266,57 @@ class Portal:
assert isinstance(fn_name, str)
else: # function reference was passed directly
# TODO: ensure async
if not (
inspect.isasyncgenfunction(func) or
inspect.iscoroutinefunction(func)
):
raise TypeError(f'{func} must be an async function!')
fn = func
fn_mod_path = fn.__module__
fn_name = fn.__name__
fn_mod_path, fn_name = func_deats(func)
return await self._return_from_resptype(
return await self._return_once(
*(await self._submit(fn_mod_path, fn_name, kwargs))
)
# @asynccontextmanager
# async def open_stream_from(
# self,
# async_gen: 'AsyncGeneratorFunction',
# **kwargs,
# ) -> ReceiveMsgStream:
# # TODO
# pass
@asynccontextmanager
async def open_stream_from(
self,
async_gen_func: Callable, # typing: ignore
**kwargs,
) -> AsyncGenerator[ReceiveMsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func):
if not inspect.iscoroutinefunction(async_gen_func) or (
not getattr(async_gen_func, '_tractor_stream_function', False)
):
raise TypeError(
f'{async_gen_func} must be an async generator function!')
fn_mod_path, fn_name = func_deats(async_gen_func)
(
cid,
recv_chan,
functype,
first_msg
) = await self._submit(fn_mod_path, fn_name, kwargs)
# receive only stream
assert functype == 'asyncgen'
ctx = Context(self.channel, cid, _portal=self)
try:
async with ReceiveMsgStream(ctx, recv_chan, self) as rchan:
self._streams.add(rchan)
yield rchan
finally:
# cancel the far end task on consumer close
try:
await ctx.cancel()
except trio.ClosedResourceError:
# if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed.
log.debug(f'Context {ctx} was already closed?')
self._streams.remove(rchan)
# @asynccontextmanager
# async def open_context(
@ -304,7 +325,9 @@ class Portal:
# **kwargs,
# ) -> Context:
# # TODO
# pass
# elif resptype == 'context': # context manager style setup/teardown
# # TODO likely not here though
# raise NotImplementedError
@dataclass
@ -324,10 +347,7 @@ class LocalPortal:
"""
obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func_name)
if inspect.iscoroutinefunction(func):
return await func(**kwargs)
else:
return func(**kwargs)
@asynccontextmanager
@ -336,7 +356,7 @@ async def open_portal(
nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True,
shield: bool = False,
) -> typing.AsyncGenerator[Portal, None]:
) -> AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing.