forked from goodboy/tractor
1
0
Fork 0

Rework `Actor.send_cmd()` to `.start_remote_task()`

This more formally declares the runtime's remote task startingn API
and uses it throughout all the dependent `Portal` API methods.
Allows dropping `Portal._submit()` and simplifying `.run_in_actor()`
style result waiting to be delegated to the context APIs at remote
task `return` response time. We now also track the remote entrypoint
"type` as `Context._remote_func_type`.
stricter_context_starting
Tyler Goodlet 2021-12-03 16:51:15 -05:00
parent 872b24aedd
commit d307eab118
3 changed files with 92 additions and 98 deletions

View File

@ -616,6 +616,7 @@ class Actor:
assert chan.uid, f"`chan.uid` can't be {chan.uid}" assert chan.uid, f"`chan.uid` can't be {chan.uid}"
ctx = self._contexts[(chan.uid, cid)] ctx = self._contexts[(chan.uid, cid)]
send_chan = ctx._send_chan send_chan = ctx._send_chan
assert send_chan
# TODO: relaying far end context errors to the local # TODO: relaying far end context errors to the local
# context through nursery raising? # context through nursery raising?
@ -655,9 +656,13 @@ class Actor:
''' '''
log.runtime(f"Getting result queue for {chan.uid} cid {cid}") log.runtime(f"Getting result queue for {chan.uid} cid {cid}")
actor_uid = chan.uid
assert actor_uid
try: try:
ctx = self._contexts[(chan.uid, cid)] ctx = self._contexts[(actor_uid, cid)]
except KeyError: except KeyError:
send_chan: trio.MemorySendChannel
recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(max_buffer_size) send_chan, recv_chan = trio.open_memory_channel(max_buffer_size)
ctx = Context( ctx = Context(
chan, chan,
@ -665,23 +670,25 @@ class Actor:
_send_chan=send_chan, _send_chan=send_chan,
_recv_chan=recv_chan, _recv_chan=recv_chan,
) )
self._contexts[(chan.uid, cid)] = ctx self._contexts[(actor_uid, cid)] = ctx
return ctx return ctx
async def send_cmd( async def start_remote_task(
self, self,
chan: Channel, chan: Channel,
ns: str, ns: str,
func: str, func: str,
kwargs: dict kwargs: dict
) -> Tuple[str, trio.abc.MemoryReceiveChannel]: ) -> Context:
''' '''
Send a ``'cmd'`` message to a remote actor and return a caller Send a ``'cmd'`` message to a remote actor, which starts
id and a ``trio.MemoryReceiveChannel`` message "feeder" channel a remote task-as-function entrypoint.
that can be used to wait for responses delivered by the local
runtime's message processing loop. Synchronously validates the endpoint type and return a caller
side task ``Context`` that can be used to wait for responses
delivered by the local runtime's message processing loop.
''' '''
cid = str(uuid.uuid4()) cid = str(uuid.uuid4())
@ -689,7 +696,20 @@ class Actor:
ctx = self.get_context(chan, cid) ctx = self.get_context(chan, cid)
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return ctx.cid, ctx._recv_chan
# Wait on first response msg and validate; this should be
# immediate.
first_msg = await ctx._recv_chan.receive()
functype = first_msg.get('functype')
if 'error' in first_msg:
raise unpack_error(first_msg, chan)
elif functype not in ('asyncfunc', 'asyncgen', 'context'):
raise ValueError(f"{first_msg} is an invalid response packet?")
ctx._remote_func_type = functype
return ctx
async def _process_messages( async def _process_messages(
self, self,

View File

@ -6,7 +6,7 @@ concurrency linked tasks running in disparate memory domains.
import importlib import importlib
import inspect import inspect
from typing import ( from typing import (
Tuple, Any, Dict, Optional, Set, Any, Optional,
Callable, AsyncGenerator Callable, AsyncGenerator
) )
from functools import partial from functools import partial
@ -49,7 +49,7 @@ async def maybe_open_nursery(
yield nursery yield nursery
def func_deats(func: Callable) -> Tuple[str, str]: def func_deats(func: Callable) -> tuple[str, str]:
return ( return (
func.__module__, func.__module__,
func.__name__, func.__name__,
@ -98,72 +98,45 @@ class Portal:
# during the portal's lifetime # during the portal's lifetime
self._result_msg: Optional[dict] = None self._result_msg: Optional[dict] = None
# When this is set to a tuple returned from ``_submit()`` then # When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some # it is expected that ``result()`` will be awaited at some
# point. Set when _submit_for_result is called # point.
self._expect_result: Optional[ self._expect_result: Optional[Context] = None
Tuple[str, Any, str, Dict[str, Any]] self._streams: set[ReceiveMsgStream] = set()
] = None
self._streams: Set[ReceiveMsgStream] = set()
self.actor = current_actor() self.actor = current_actor()
async def _submit( async def _submit_for_result(
self, self,
ns: str, ns: str,
func: str, func: str,
kwargs, **kwargs
) -> None:
) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]:
'''
Submit a function to be scheduled and run by actor, return the
associated caller id, response queue, response type str, first
message packet as a tuple.
This is an async call.
'''
# ship a function call request to the remote actor
cid, recv_chan = await self.actor.send_cmd(
self.channel, ns, func, kwargs)
# wait on first response msg and handle (this should be
# in an immediate response)
first_msg = await recv_chan.receive()
functype = first_msg.get('functype')
if 'error' in first_msg:
raise unpack_error(first_msg, self.channel)
elif functype not in ('asyncfunc', 'asyncgen', 'context'):
raise ValueError(f"{first_msg} is an invalid response packet?")
return cid, recv_chan, functype, first_msg
async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None:
assert self._expect_result is None, \ assert self._expect_result is None, \
"A pending main result has already been submitted" "A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, kwargs) self._expect_result = await self.actor.start_remote_task(
self.channel,
ns,
func,
kwargs
)
async def _return_once( async def _return_once(
self, self,
cid: str, ctx: Context,
recv_chan: trio.abc.ReceiveChannel,
resptype: str,
first_msg: dict
) -> dict[str, Any]: ) -> dict[str, Any]:
assert resptype == 'asyncfunc' # single response assert ctx._remote_func_type == 'asyncfunc' # single response
msg = await recv_chan.receive() msg = await ctx._recv_chan.receive()
return msg return msg
async def result(self) -> Any: async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task. '''
""" Return the result(s) from the remote actor's "main" task.
'''
# Check for non-rpc errors slapped on the # Check for non-rpc errors slapped on the
# channel for which we always raise # channel for which we always raise
exc = self.channel._exc exc = self.channel._exc
@ -182,7 +155,9 @@ class Portal:
assert self._expect_result assert self._expect_result
if self._result_msg is None: if self._result_msg is None:
self._result_msg = await self._return_once(*self._expect_result) self._result_msg = await self._return_once(
self._expect_result
)
return _unwrap_msg(self._result_msg, self.channel) return _unwrap_msg(self._result_msg, self.channel)
@ -275,7 +250,12 @@ class Portal:
''' '''
msg = await self._return_once( msg = await self._return_once(
*(await self._submit(namespace_path, function_name, kwargs)) await self.actor.start_remote_task(
self.channel,
namespace_path,
function_name,
kwargs,
)
) )
return _unwrap_msg(msg, self.channel) return _unwrap_msg(msg, self.channel)
@ -320,7 +300,12 @@ class Portal:
return _unwrap_msg( return _unwrap_msg(
await self._return_once( await self._return_once(
*(await self._submit(fn_mod_path, fn_name, kwargs)), await self.actor.start_remote_task(
self.channel,
fn_mod_path,
fn_name,
kwargs,
)
), ),
self.channel, self.channel,
) )
@ -342,27 +327,21 @@ class Portal:
f'{async_gen_func} must be an async generator function!') f'{async_gen_func} must be an async generator function!')
fn_mod_path, fn_name = func_deats(async_gen_func) fn_mod_path, fn_name = func_deats(async_gen_func)
( ctx = await self.actor.start_remote_task(
cid,
recv_chan,
functype,
first_msg
) = await self._submit(fn_mod_path, fn_name, kwargs)
# receive only stream
assert functype == 'asyncgen'
ctx = Context(
self.channel, self.channel,
cid, fn_mod_path,
# do we need this to be closed implicitly? fn_name,
# _recv_chan=recv_chan, kwargs
_portal=self
) )
ctx._portal = self
# ensure receive-only stream entrypoint
assert ctx._remote_func_type == 'asyncgen'
try: try:
# deliver receive only stream # deliver receive only stream
async with ReceiveMsgStream( async with ReceiveMsgStream(
ctx, recv_chan, ctx, ctx._recv_chan,
) as rchan: ) as rchan:
self._streams.add(rchan) self._streams.add(rchan)
yield rchan yield rchan
@ -396,7 +375,7 @@ class Portal:
func: Callable, func: Callable,
**kwargs, **kwargs,
) -> AsyncGenerator[Tuple[Context, Any], None]: ) -> AsyncGenerator[tuple[Context, Any], None]:
''' '''
Open an inter-actor task context. Open an inter-actor task context.
@ -415,14 +394,14 @@ class Portal:
f'{func} must be an async generator function!') f'{func} must be an async generator function!')
fn_mod_path, fn_name = func_deats(func) fn_mod_path, fn_name = func_deats(func)
ctx = await self.actor.start_remote_task(
recv_chan: Optional[trio.MemoryReceiveChannel] = None self.channel,
fn_mod_path,
cid, recv_chan, functype, first_msg = await self._submit( fn_name,
fn_mod_path, fn_name, kwargs) kwargs
)
assert functype == 'context' assert ctx._remote_func_type == 'context'
msg = await recv_chan.receive() msg = await ctx._recv_chan.receive()
try: try:
# the "first" value here is delivered by the callee's # the "first" value here is delivered by the callee's
@ -439,12 +418,6 @@ class Portal:
raise raise
_err: Optional[BaseException] = None _err: Optional[BaseException] = None
# this should already have been created from the
# ``._submit()`` call above.
ctx = self.actor.get_context(self.channel, cid)
# pairs with handling in ``Actor._push_result()``
assert ctx._recv_chan is recv_chan
ctx._portal = self ctx._portal = self
# deliver context instance and .started() msg value in open tuple. # deliver context instance and .started() msg value in open tuple.
@ -502,9 +475,9 @@ class Portal:
# operating *in* this scope to have survived # operating *in* this scope to have survived
# we tear down the runtime feeder chan last # we tear down the runtime feeder chan last
# to avoid premature stream clobbers. # to avoid premature stream clobbers.
if recv_chan is not None: if ctx._recv_chan is not None:
# should we encapsulate this in the context api? # should we encapsulate this in the context api?
await recv_chan.aclose() await ctx._recv_chan.aclose()
if _err: if _err:
if ctx._cancel_called: if ctx._cancel_called:

View File

@ -322,13 +322,14 @@ class Context:
# these are the "feeder" channels for delivering # these are the "feeder" channels for delivering
# message values to the local task from the runtime # message values to the local task from the runtime
# msg processing loop. # msg processing loop.
_recv_chan: Optional[trio.MemoryReceiveChannel] = None _recv_chan: trio.MemoryReceiveChannel
_send_chan: Optional[trio.MemorySendChannel] = None _send_chan: trio.MemorySendChannel
_remote_func_type: Optional[str] = None
# only set on the caller side # only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa _portal: Optional['Portal'] = None # type: ignore # noqa
_result: Optional[Any] = False _result: Optional[Any] = False
_remote_func_type: str = None
# status flags # status flags
_cancel_called: bool = False _cancel_called: bool = False
@ -474,7 +475,7 @@ class Context:
) )
# NOTE: in one way streaming this only happens on the # NOTE: in one way streaming this only happens on the
# caller side inside `Actor.send_cmd()` so if you try # caller side inside `Actor.start_remote_task()` so if you try
# to send a stop from the caller to the callee in the # to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error # single-direction-stream case you'll get a lookup error
# currently. # currently.