Use `tractor.Context` throughout the runtime core
Instead of tracking feeder mem chans per RPC dialog, store `Context` instances which (now) hold refs to the underlying RPC-task feeder chans and track them inside a `Actor._contexts` map. This begins a transition to making the "context" idea the primitive abstraction for representing messaging dialogs between tasks in different memory domains (i.e. usually separate processes). A slew of changes made this possible: - change `Actor.get_memchans()` -> `.get_context()`. - Add new `Context._send_chan` and `._recv_chan` vars. - implicitly create a new context on every `Actor.send_cmd()` call. - use the context created by `.send_cmd()` in `Portal.open_context()` instead of manually creating one. - call `Actor.get_context()` inside tasks run from `._invoke()` such that feeder chans are implicitly created for callee tasks thus fixing the bug #265. NB: We might change some of the internal semantics to do with *when* the feeder chans are actually created to denote whether or not a far end task is actually *read to receive* messages. For example, in the cases where it **never** will be ready to receive messages (one-way streaming, a context that never opens a stream, etc.) we will likely want some kind of error or at least warning to the caller that messages can't be sent (yet).stricter_context_starting
parent
3f6099f161
commit
c5c3f7e789
|
@ -50,6 +50,7 @@ async def _invoke(
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
func: typing.Callable,
|
func: typing.Callable,
|
||||||
kwargs: dict[str, Any],
|
kwargs: dict[str, Any],
|
||||||
|
|
||||||
is_rpc: bool = True,
|
is_rpc: bool = True,
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
Union[trio.CancelScope, BaseException]
|
Union[trio.CancelScope, BaseException]
|
||||||
|
@ -68,9 +69,10 @@ async def _invoke(
|
||||||
tb = None
|
tb = None
|
||||||
|
|
||||||
cancel_scope = trio.CancelScope()
|
cancel_scope = trio.CancelScope()
|
||||||
|
# activated cancel scope ref
|
||||||
cs: Optional[trio.CancelScope] = None
|
cs: Optional[trio.CancelScope] = None
|
||||||
|
|
||||||
ctx = Context(chan, cid)
|
ctx = actor.get_context(chan, cid)
|
||||||
context: bool = False
|
context: bool = False
|
||||||
|
|
||||||
if getattr(func, '_tractor_stream_function', False):
|
if getattr(func, '_tractor_stream_function', False):
|
||||||
|
@ -379,19 +381,19 @@ class Actor:
|
||||||
self._no_more_peers.set()
|
self._no_more_peers.set()
|
||||||
self._ongoing_rpc_tasks = trio.Event()
|
self._ongoing_rpc_tasks = trio.Event()
|
||||||
self._ongoing_rpc_tasks.set()
|
self._ongoing_rpc_tasks.set()
|
||||||
|
|
||||||
# (chan, cid) -> (cancel_scope, func)
|
# (chan, cid) -> (cancel_scope, func)
|
||||||
self._rpc_tasks: dict[
|
self._rpc_tasks: dict[
|
||||||
Tuple[Channel, str],
|
Tuple[Channel, str],
|
||||||
Tuple[trio.CancelScope, typing.Callable, trio.Event]
|
Tuple[trio.CancelScope, typing.Callable, trio.Event]
|
||||||
] = {}
|
] = {}
|
||||||
# map {uids -> {callids -> waiter queues}}
|
|
||||||
self._cids2qs: dict[
|
# map {actor uids -> Context}
|
||||||
|
self._contexts: dict[
|
||||||
Tuple[Tuple[str, str], str],
|
Tuple[Tuple[str, str], str],
|
||||||
Tuple[
|
Context
|
||||||
trio.abc.SendChannel[Any],
|
|
||||||
trio.abc.ReceiveChannel[Any]
|
|
||||||
]
|
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
self._listeners: List[trio.abc.Listener] = []
|
self._listeners: List[trio.abc.Listener] = []
|
||||||
self._parent_chan: Optional[Channel] = None
|
self._parent_chan: Optional[Channel] = None
|
||||||
self._forkserver_info: Optional[
|
self._forkserver_info: Optional[
|
||||||
|
@ -567,17 +569,7 @@ class Actor:
|
||||||
|
|
||||||
await local_nursery.exited.wait()
|
await local_nursery.exited.wait()
|
||||||
|
|
||||||
# channel cleanup sequence
|
# ``Channel`` teardown and closure sequence
|
||||||
|
|
||||||
# for (channel, cid) in self._rpc_tasks.copy():
|
|
||||||
# if channel is chan:
|
|
||||||
# with trio.CancelScope(shield=True):
|
|
||||||
# await self._cancel_task(cid, channel)
|
|
||||||
|
|
||||||
# # close all consumer side task mem chans
|
|
||||||
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
|
|
||||||
# assert send_chan.cid == cid # type: ignore
|
|
||||||
# await send_chan.aclose()
|
|
||||||
|
|
||||||
# Drop ref to channel so it can be gc-ed and disconnected
|
# Drop ref to channel so it can be gc-ed and disconnected
|
||||||
log.runtime(f"Releasing channel {chan} from {chan.uid}")
|
log.runtime(f"Releasing channel {chan} from {chan.uid}")
|
||||||
|
@ -621,19 +613,15 @@ class Actor:
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Push an RPC result to the local consumer's queue.
|
"""Push an RPC result to the local consumer's queue.
|
||||||
"""
|
"""
|
||||||
# actorid = chan.uid
|
|
||||||
assert chan.uid, f"`chan.uid` can't be {chan.uid}"
|
assert chan.uid, f"`chan.uid` can't be {chan.uid}"
|
||||||
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
|
ctx = self._contexts[(chan.uid, cid)]
|
||||||
assert send_chan.cid == cid # type: ignore
|
send_chan = ctx._send_chan
|
||||||
|
|
||||||
|
# TODO: relaying far end context errors to the local
|
||||||
|
# context through nursery raising?
|
||||||
# if 'error' in msg:
|
# if 'error' in msg:
|
||||||
# ctx = getattr(recv_chan, '_ctx', None)
|
# ctx._error_from_remote_msg(msg)
|
||||||
# if ctx:
|
|
||||||
# ctx._error_from_remote_msg(msg)
|
|
||||||
|
|
||||||
# log.runtime(f"{send_chan} was terminated at remote end")
|
# log.runtime(f"{send_chan} was terminated at remote end")
|
||||||
# # indicate to consumer that far end has stopped
|
|
||||||
# return await send_chan.aclose()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
|
log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
|
||||||
|
@ -651,23 +639,35 @@ class Actor:
|
||||||
# so cancel the far end streaming task
|
# so cancel the far end streaming task
|
||||||
log.warning(f"{send_chan} consumer is already closed")
|
log.warning(f"{send_chan} consumer is already closed")
|
||||||
|
|
||||||
def get_memchans(
|
def get_context(
|
||||||
self,
|
self,
|
||||||
actorid: Tuple[str, str],
|
chan: Channel,
|
||||||
cid: str
|
cid: str,
|
||||||
|
max_buffer_size: int = 2**6,
|
||||||
|
|
||||||
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
|
) -> Context:
|
||||||
|
'''
|
||||||
|
Look up or create a new inter-actor-task-IPC-linked task
|
||||||
|
"context" which encapsulates the local task's scheduling
|
||||||
|
enviroment including a ``trio`` cancel scope, a pair of IPC
|
||||||
|
messaging "feeder" channels, and an RPC id unique to the
|
||||||
|
task-as-function invocation.
|
||||||
|
|
||||||
log.runtime(f"Getting result queue for {actorid} cid {cid}")
|
'''
|
||||||
|
log.runtime(f"Getting result queue for {chan.uid} cid {cid}")
|
||||||
try:
|
try:
|
||||||
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
ctx = self._contexts[(chan.uid, cid)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
send_chan, recv_chan = trio.open_memory_channel(2**6)
|
send_chan, recv_chan = trio.open_memory_channel(max_buffer_size)
|
||||||
send_chan.cid = cid # type: ignore
|
ctx = Context(
|
||||||
recv_chan.cid = cid # type: ignore
|
chan,
|
||||||
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
|
cid,
|
||||||
|
_send_chan=send_chan,
|
||||||
|
_recv_chan=recv_chan,
|
||||||
|
)
|
||||||
|
self._contexts[(chan.uid, cid)] = ctx
|
||||||
|
|
||||||
return send_chan, recv_chan
|
return ctx
|
||||||
|
|
||||||
async def send_cmd(
|
async def send_cmd(
|
||||||
self,
|
self,
|
||||||
|
@ -675,19 +675,21 @@ class Actor:
|
||||||
ns: str,
|
ns: str,
|
||||||
func: str,
|
func: str,
|
||||||
kwargs: dict
|
kwargs: dict
|
||||||
) -> Tuple[str, trio.abc.ReceiveChannel]:
|
|
||||||
|
) -> Tuple[str, trio.abc.MemoryReceiveChannel]:
|
||||||
'''
|
'''
|
||||||
Send a ``'cmd'`` message to a remote actor and return a
|
Send a ``'cmd'`` message to a remote actor and return a caller
|
||||||
caller id and a ``trio.Queue`` that can be used to wait for
|
id and a ``trio.MemoryReceiveChannel`` message "feeder" channel
|
||||||
responses delivered by the local message processing loop.
|
that can be used to wait for responses delivered by the local
|
||||||
|
runtime's message processing loop.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
cid = str(uuid.uuid4())
|
cid = str(uuid.uuid4())
|
||||||
assert chan.uid
|
assert chan.uid
|
||||||
send_chan, recv_chan = self.get_memchans(chan.uid, cid)
|
ctx = self.get_context(chan, cid)
|
||||||
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||||
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
||||||
return cid, recv_chan
|
return ctx.cid, ctx._recv_chan
|
||||||
|
|
||||||
async def _process_messages(
|
async def _process_messages(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -109,16 +109,20 @@ class Portal:
|
||||||
|
|
||||||
async def _submit(
|
async def _submit(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
ns: str,
|
ns: str,
|
||||||
func: str,
|
func: str,
|
||||||
kwargs,
|
kwargs,
|
||||||
|
|
||||||
) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]:
|
) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]:
|
||||||
"""Submit a function to be scheduled and run by actor, return the
|
'''
|
||||||
associated caller id, response queue, response type str,
|
Submit a function to be scheduled and run by actor, return the
|
||||||
first message packet as a tuple.
|
associated caller id, response queue, response type str, first
|
||||||
|
message packet as a tuple.
|
||||||
|
|
||||||
This is an async call.
|
This is an async call.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
# ship a function call request to the remote actor
|
# ship a function call request to the remote actor
|
||||||
cid, recv_chan = await self.actor.send_cmd(
|
cid, recv_chan = await self.actor.send_cmd(
|
||||||
self.channel, ns, func, kwargs)
|
self.channel, ns, func, kwargs)
|
||||||
|
@ -244,7 +248,8 @@ class Portal:
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"{self.channel} for {self.channel.uid} was already closed or broken?")
|
f"{self.channel} for {self.channel.uid} was already "
|
||||||
|
"closed or broken?")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def run_from_ns(
|
async def run_from_ns(
|
||||||
|
@ -392,7 +397,8 @@ class Portal:
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> AsyncGenerator[Tuple[Context, Any], None]:
|
) -> AsyncGenerator[Tuple[Context, Any], None]:
|
||||||
'''Open an inter-actor task context.
|
'''
|
||||||
|
Open an inter-actor task context.
|
||||||
|
|
||||||
This is a synchronous API which allows for deterministic
|
This is a synchronous API which allows for deterministic
|
||||||
setup/teardown of a remote task. The yielded ``Context`` further
|
setup/teardown of a remote task. The yielded ``Context`` further
|
||||||
|
@ -433,19 +439,20 @@ class Portal:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
_err: Optional[BaseException] = None
|
_err: Optional[BaseException] = None
|
||||||
|
|
||||||
|
# this should already have been created from the
|
||||||
|
# ``._submit()`` call above.
|
||||||
|
ctx = self.actor.get_context(self.channel, cid)
|
||||||
|
# pairs with handling in ``Actor._push_result()``
|
||||||
|
assert ctx._recv_chan is recv_chan
|
||||||
|
ctx._portal = self
|
||||||
|
|
||||||
# deliver context instance and .started() msg value in open tuple.
|
# deliver context instance and .started() msg value in open tuple.
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as scope_nursery:
|
async with trio.open_nursery() as scope_nursery:
|
||||||
ctx = Context(
|
ctx._scope_nursery = scope_nursery
|
||||||
self.channel,
|
|
||||||
cid,
|
|
||||||
_portal=self,
|
|
||||||
_recv_chan=recv_chan,
|
|
||||||
_scope_nursery=scope_nursery,
|
|
||||||
)
|
|
||||||
|
|
||||||
# pairs with handling in ``Actor._push_result()``
|
# do we need this?
|
||||||
# recv_chan._ctx = ctx
|
|
||||||
# await trio.lowlevel.checkpoint()
|
# await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
@ -496,6 +503,7 @@ class Portal:
|
||||||
# we tear down the runtime feeder chan last
|
# we tear down the runtime feeder chan last
|
||||||
# to avoid premature stream clobbers.
|
# to avoid premature stream clobbers.
|
||||||
if recv_chan is not None:
|
if recv_chan is not None:
|
||||||
|
# should we encapsulate this in the context api?
|
||||||
await recv_chan.aclose()
|
await recv_chan.aclose()
|
||||||
|
|
||||||
if _err:
|
if _err:
|
||||||
|
|
|
@ -32,9 +32,10 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
'''A IPC message stream for receiving logically sequenced values
|
'''
|
||||||
over an inter-actor ``Channel``. This is the type returned to
|
A IPC message stream for receiving logically sequenced values over
|
||||||
a local task which entered either ``Portal.open_stream_from()`` or
|
an inter-actor ``Channel``. This is the type returned to a local
|
||||||
|
task which entered either ``Portal.open_stream_from()`` or
|
||||||
``Context.open_stream()``.
|
``Context.open_stream()``.
|
||||||
|
|
||||||
Termination rules:
|
Termination rules:
|
||||||
|
@ -177,7 +178,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
|
|
||||||
# In the bidirectional case, `Context.open_stream()` will create
|
# In the bidirectional case, `Context.open_stream()` will create
|
||||||
# the `Actor._cids2qs` entry from a call to
|
# the `Actor._cids2qs` entry from a call to
|
||||||
# `Actor.get_memchans()` and will send the stop message in
|
# `Actor.get_context()` and will send the stop message in
|
||||||
# ``__aexit__()`` on teardown so it **does not** need to be
|
# ``__aexit__()`` on teardown so it **does not** need to be
|
||||||
# called here.
|
# called here.
|
||||||
if not self._ctx._portal:
|
if not self._ctx._portal:
|
||||||
|
@ -302,29 +303,38 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Context:
|
class Context:
|
||||||
'''An inter-actor task communication context.
|
'''
|
||||||
|
An inter-actor, ``trio`` task communication context.
|
||||||
|
|
||||||
Allows maintaining task or protocol specific state between
|
Allows maintaining task or protocol specific state between
|
||||||
2 communicating actor tasks. A unique context is created on the
|
2 communicating actor tasks. A unique context is created on the
|
||||||
callee side/end for every request to a remote actor from a portal.
|
callee side/end for every request to a remote actor from a portal.
|
||||||
|
|
||||||
A context can be cancelled and (possibly eventually restarted) from
|
A context can be cancelled and (possibly eventually restarted) from
|
||||||
either side of the underlying IPC channel.
|
either side of the underlying IPC channel, open task oriented
|
||||||
|
message streams and acts as an IPC aware inter-actor-task cancel
|
||||||
A context can be used to open task oriented message streams and can
|
scope.
|
||||||
be thought of as an IPC aware inter-actor cancel scope.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
chan: Channel
|
chan: Channel
|
||||||
cid: str
|
cid: str
|
||||||
|
|
||||||
|
# these are the "feeder" channels for delivering
|
||||||
|
# message values to the local task from the runtime
|
||||||
|
# msg processing loop.
|
||||||
|
_recv_chan: Optional[trio.MemoryReceiveChannel] = None
|
||||||
|
_send_chan: Optional[trio.MemorySendChannel] = None
|
||||||
|
|
||||||
# only set on the caller side
|
# only set on the caller side
|
||||||
_portal: Optional['Portal'] = None # type: ignore # noqa
|
_portal: Optional['Portal'] = None # type: ignore # noqa
|
||||||
_recv_chan: Optional[trio.MemoryReceiveChannel] = None
|
|
||||||
_result: Optional[Any] = False
|
_result: Optional[Any] = False
|
||||||
|
_remote_func_type: str = None
|
||||||
|
|
||||||
|
# status flags
|
||||||
_cancel_called: bool = False
|
_cancel_called: bool = False
|
||||||
_started_called: bool = False
|
_started_called: bool = False
|
||||||
_started_received: bool = False
|
_started_received: bool = False
|
||||||
|
_stream_opened: bool = False
|
||||||
|
|
||||||
# only set on the callee side
|
# only set on the callee side
|
||||||
_scope_nursery: Optional[trio.Nursery] = None
|
_scope_nursery: Optional[trio.Nursery] = None
|
||||||
|
@ -424,7 +434,8 @@ class Context:
|
||||||
self,
|
self,
|
||||||
|
|
||||||
) -> AsyncGenerator[MsgStream, None]:
|
) -> AsyncGenerator[MsgStream, None]:
|
||||||
'''Open a ``MsgStream``, a bi-directional stream connected to the
|
'''
|
||||||
|
Open a ``MsgStream``, a bi-directional stream connected to the
|
||||||
cross-actor (far end) task for this ``Context``.
|
cross-actor (far end) task for this ``Context``.
|
||||||
|
|
||||||
This context manager must be entered on both the caller and
|
This context manager must be entered on both the caller and
|
||||||
|
@ -467,29 +478,32 @@ class Context:
|
||||||
# to send a stop from the caller to the callee in the
|
# to send a stop from the caller to the callee in the
|
||||||
# single-direction-stream case you'll get a lookup error
|
# single-direction-stream case you'll get a lookup error
|
||||||
# currently.
|
# currently.
|
||||||
_, recv_chan = actor.get_memchans(
|
ctx = actor.get_context(
|
||||||
self.chan.uid,
|
self.chan,
|
||||||
self.cid
|
self.cid,
|
||||||
)
|
)
|
||||||
|
assert ctx is self
|
||||||
|
|
||||||
# XXX: If the underlying channel feeder receive mem chan has
|
# XXX: If the underlying channel feeder receive mem chan has
|
||||||
# been closed then likely client code has already exited
|
# been closed then likely client code has already exited
|
||||||
# a ``.open_stream()`` block prior or there was some other
|
# a ``.open_stream()`` block prior or there was some other
|
||||||
# unanticipated error or cancellation from ``trio``.
|
# unanticipated error or cancellation from ``trio``.
|
||||||
|
|
||||||
if recv_chan._closed:
|
if ctx._recv_chan._closed:
|
||||||
raise trio.ClosedResourceError(
|
raise trio.ClosedResourceError(
|
||||||
'The underlying channel for this stream was already closed!?')
|
'The underlying channel for this stream was already closed!?')
|
||||||
|
|
||||||
async with MsgStream(
|
async with MsgStream(
|
||||||
ctx=self,
|
ctx=self,
|
||||||
rx_chan=recv_chan,
|
rx_chan=ctx._recv_chan,
|
||||||
) as rchan:
|
) as rchan:
|
||||||
|
|
||||||
if self._portal:
|
if self._portal:
|
||||||
self._portal._streams.add(rchan)
|
self._portal._streams.add(rchan)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
self._stream_opened = True
|
||||||
|
|
||||||
# ensure we aren't cancelled before delivering
|
# ensure we aren't cancelled before delivering
|
||||||
# the stream
|
# the stream
|
||||||
# await trio.lowlevel.checkpoint()
|
# await trio.lowlevel.checkpoint()
|
||||||
|
|
Loading…
Reference in New Issue