diff --git a/tractor/_actor.py b/tractor/_actor.py index de86df1..3ea99ab 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -50,6 +50,7 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: dict[str, Any], + is_rpc: bool = True, task_status: TaskStatus[ Union[trio.CancelScope, BaseException] @@ -68,9 +69,10 @@ async def _invoke( tb = None cancel_scope = trio.CancelScope() + # activated cancel scope ref cs: Optional[trio.CancelScope] = None - ctx = Context(chan, cid) + ctx = actor.get_context(chan, cid) context: bool = False if getattr(func, '_tractor_stream_function', False): @@ -379,19 +381,19 @@ class Actor: self._no_more_peers.set() self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks.set() + # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: dict[ Tuple[Channel, str], Tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} - # map {uids -> {callids -> waiter queues}} - self._cids2qs: dict[ + + # map {actor uids -> Context} + self._contexts: dict[ Tuple[Tuple[str, str], str], - Tuple[ - trio.abc.SendChannel[Any], - trio.abc.ReceiveChannel[Any] - ] + Context ] = {} + self._listeners: List[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ @@ -567,17 +569,7 @@ class Actor: await local_nursery.exited.wait() - # channel cleanup sequence - - # for (channel, cid) in self._rpc_tasks.copy(): - # if channel is chan: - # with trio.CancelScope(shield=True): - # await self._cancel_task(cid, channel) - - # # close all consumer side task mem chans - # send_chan, _ = self._cids2qs[(chan.uid, cid)] - # assert send_chan.cid == cid # type: ignore - # await send_chan.aclose() + # ``Channel`` teardown and closure sequence # Drop ref to channel so it can be gc-ed and disconnected log.runtime(f"Releasing channel {chan} from {chan.uid}") @@ -621,19 +613,15 @@ class Actor: ) -> None: """Push an RPC result to the local consumer's queue. """ - # actorid = chan.uid assert chan.uid, f"`chan.uid` can't be {chan.uid}" - send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] - assert send_chan.cid == cid # type: ignore + ctx = self._contexts[(chan.uid, cid)] + send_chan = ctx._send_chan + # TODO: relaying far end context errors to the local + # context through nursery raising? # if 'error' in msg: - # ctx = getattr(recv_chan, '_ctx', None) - # if ctx: - # ctx._error_from_remote_msg(msg) - + # ctx._error_from_remote_msg(msg) # log.runtime(f"{send_chan} was terminated at remote end") - # # indicate to consumer that far end has stopped - # return await send_chan.aclose() try: log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") @@ -651,23 +639,35 @@ class Actor: # so cancel the far end streaming task log.warning(f"{send_chan} consumer is already closed") - def get_memchans( + def get_context( self, - actorid: Tuple[str, str], - cid: str + chan: Channel, + cid: str, + max_buffer_size: int = 2**6, - ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: + ) -> Context: + ''' + Look up or create a new inter-actor-task-IPC-linked task + "context" which encapsulates the local task's scheduling + enviroment including a ``trio`` cancel scope, a pair of IPC + messaging "feeder" channels, and an RPC id unique to the + task-as-function invocation. - log.runtime(f"Getting result queue for {actorid} cid {cid}") + ''' + log.runtime(f"Getting result queue for {chan.uid} cid {cid}") try: - send_chan, recv_chan = self._cids2qs[(actorid, cid)] + ctx = self._contexts[(chan.uid, cid)] except KeyError: - send_chan, recv_chan = trio.open_memory_channel(2**6) - send_chan.cid = cid # type: ignore - recv_chan.cid = cid # type: ignore - self._cids2qs[(actorid, cid)] = send_chan, recv_chan + send_chan, recv_chan = trio.open_memory_channel(max_buffer_size) + ctx = Context( + chan, + cid, + _send_chan=send_chan, + _recv_chan=recv_chan, + ) + self._contexts[(chan.uid, cid)] = ctx - return send_chan, recv_chan + return ctx async def send_cmd( self, @@ -675,19 +675,21 @@ class Actor: ns: str, func: str, kwargs: dict - ) -> Tuple[str, trio.abc.ReceiveChannel]: + + ) -> Tuple[str, trio.abc.MemoryReceiveChannel]: ''' - Send a ``'cmd'`` message to a remote actor and return a - caller id and a ``trio.Queue`` that can be used to wait for - responses delivered by the local message processing loop. + Send a ``'cmd'`` message to a remote actor and return a caller + id and a ``trio.MemoryReceiveChannel`` message "feeder" channel + that can be used to wait for responses delivered by the local + runtime's message processing loop. ''' cid = str(uuid.uuid4()) assert chan.uid - send_chan, recv_chan = self.get_memchans(chan.uid, cid) + ctx = self.get_context(chan, cid) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return cid, recv_chan + return ctx.cid, ctx._recv_chan async def _process_messages( self, diff --git a/tractor/_portal.py b/tractor/_portal.py index 70339fa..89bd3a9 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -109,16 +109,20 @@ class Portal: async def _submit( self, + ns: str, func: str, kwargs, + ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: - """Submit a function to be scheduled and run by actor, return the - associated caller id, response queue, response type str, - first message packet as a tuple. + ''' + Submit a function to be scheduled and run by actor, return the + associated caller id, response queue, response type str, first + message packet as a tuple. This is an async call. - """ + + ''' # ship a function call request to the remote actor cid, recv_chan = await self.actor.send_cmd( self.channel, ns, func, kwargs) @@ -244,7 +248,8 @@ class Portal: trio.BrokenResourceError, ): log.cancel( - f"{self.channel} for {self.channel.uid} was already closed or broken?") + f"{self.channel} for {self.channel.uid} was already " + "closed or broken?") return False async def run_from_ns( @@ -392,7 +397,8 @@ class Portal: **kwargs, ) -> AsyncGenerator[Tuple[Context, Any], None]: - '''Open an inter-actor task context. + ''' + Open an inter-actor task context. This is a synchronous API which allows for deterministic setup/teardown of a remote task. The yielded ``Context`` further @@ -433,19 +439,20 @@ class Portal: raise _err: Optional[BaseException] = None + + # this should already have been created from the + # ``._submit()`` call above. + ctx = self.actor.get_context(self.channel, cid) + # pairs with handling in ``Actor._push_result()`` + assert ctx._recv_chan is recv_chan + ctx._portal = self + # deliver context instance and .started() msg value in open tuple. try: async with trio.open_nursery() as scope_nursery: - ctx = Context( - self.channel, - cid, - _portal=self, - _recv_chan=recv_chan, - _scope_nursery=scope_nursery, - ) + ctx._scope_nursery = scope_nursery - # pairs with handling in ``Actor._push_result()`` - # recv_chan._ctx = ctx + # do we need this? # await trio.lowlevel.checkpoint() yield ctx, first @@ -496,6 +503,7 @@ class Portal: # we tear down the runtime feeder chan last # to avoid premature stream clobbers. if recv_chan is not None: + # should we encapsulate this in the context api? await recv_chan.aclose() if _err: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index af9ffaf..29be554 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -32,9 +32,10 @@ log = get_logger(__name__) class ReceiveMsgStream(trio.abc.ReceiveChannel): - '''A IPC message stream for receiving logically sequenced values - over an inter-actor ``Channel``. This is the type returned to - a local task which entered either ``Portal.open_stream_from()`` or + ''' + A IPC message stream for receiving logically sequenced values over + an inter-actor ``Channel``. This is the type returned to a local + task which entered either ``Portal.open_stream_from()`` or ``Context.open_stream()``. Termination rules: @@ -177,7 +178,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # In the bidirectional case, `Context.open_stream()` will create # the `Actor._cids2qs` entry from a call to - # `Actor.get_memchans()` and will send the stop message in + # `Actor.get_context()` and will send the stop message in # ``__aexit__()`` on teardown so it **does not** need to be # called here. if not self._ctx._portal: @@ -302,29 +303,38 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): @dataclass class Context: - '''An inter-actor task communication context. + ''' + An inter-actor, ``trio`` task communication context. Allows maintaining task or protocol specific state between 2 communicating actor tasks. A unique context is created on the callee side/end for every request to a remote actor from a portal. A context can be cancelled and (possibly eventually restarted) from - either side of the underlying IPC channel. - - A context can be used to open task oriented message streams and can - be thought of as an IPC aware inter-actor cancel scope. + either side of the underlying IPC channel, open task oriented + message streams and acts as an IPC aware inter-actor-task cancel + scope. ''' chan: Channel cid: str + # these are the "feeder" channels for delivering + # message values to the local task from the runtime + # msg processing loop. + _recv_chan: Optional[trio.MemoryReceiveChannel] = None + _send_chan: Optional[trio.MemorySendChannel] = None + # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa - _recv_chan: Optional[trio.MemoryReceiveChannel] = None _result: Optional[Any] = False + _remote_func_type: str = None + + # status flags _cancel_called: bool = False _started_called: bool = False _started_received: bool = False + _stream_opened: bool = False # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None @@ -424,7 +434,8 @@ class Context: self, ) -> AsyncGenerator[MsgStream, None]: - '''Open a ``MsgStream``, a bi-directional stream connected to the + ''' + Open a ``MsgStream``, a bi-directional stream connected to the cross-actor (far end) task for this ``Context``. This context manager must be entered on both the caller and @@ -467,29 +478,32 @@ class Context: # to send a stop from the caller to the callee in the # single-direction-stream case you'll get a lookup error # currently. - _, recv_chan = actor.get_memchans( - self.chan.uid, - self.cid + ctx = actor.get_context( + self.chan, + self.cid, ) + assert ctx is self # XXX: If the underlying channel feeder receive mem chan has # been closed then likely client code has already exited # a ``.open_stream()`` block prior or there was some other # unanticipated error or cancellation from ``trio``. - if recv_chan._closed: + if ctx._recv_chan._closed: raise trio.ClosedResourceError( 'The underlying channel for this stream was already closed!?') async with MsgStream( ctx=self, - rx_chan=recv_chan, + rx_chan=ctx._recv_chan, ) as rchan: if self._portal: self._portal._streams.add(rchan) try: + self._stream_opened = True + # ensure we aren't cancelled before delivering # the stream # await trio.lowlevel.checkpoint()