diff --git a/tractor/__init__.py b/tractor/__init__.py index 12123a2..aa26210 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -15,18 +15,20 @@ # along with this program. If not, see . """ -tractor: structured concurrent "actors". +tractor: structured concurrent ``trio``-"actors". """ from exceptiongroup import BaseExceptionGroup from ._clustering import open_actor_cluster from ._ipc import Channel -from ._streaming import ( +from ._context import ( Context, + context, +) +from ._streaming import ( MsgStream, stream, - context, ) from ._discovery import ( get_arbiter, diff --git a/tractor/_context.py b/tractor/_context.py new file mode 100644 index 0000000..4e52b21 --- /dev/null +++ b/tractor/_context.py @@ -0,0 +1,767 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +The fundamental cross process SC abstraction: an inter-actor, +cancel-scope linked task "context". + +A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built +into each ``trio.Nursery`` except it links the lifetimes of memory space +disjoint, parallel executing tasks in separate actors. + +''' +from __future__ import annotations +from collections import deque +from contextlib import asynccontextmanager as acm +from dataclasses import ( + dataclass, + field, +) +from functools import partial +import inspect +from pprint import pformat +from typing import ( + Any, + Callable, + AsyncGenerator, + TYPE_CHECKING, +) +import warnings + +import trio + +from ._exceptions import ( + unpack_error, + pack_error, + ContextCancelled, + StreamOverrun, +) +from .log import get_logger +from ._ipc import Channel +from ._streaming import MsgStream +from ._state import current_actor + +if TYPE_CHECKING: + from ._portal import Portal + + +log = get_logger(__name__) + + +@dataclass +class Context: + ''' + An inter-actor, ``trio`` task communication context. + + NB: This class should never be instatiated directly, it is delivered + by either, + - runtime machinery to a remotely started task or, + - by entering ``Portal.open_context()``. + + Allows maintaining task or protocol specific state between + 2 communicating actor tasks. A unique context is created on the + callee side/end for every request to a remote actor from a portal. + + A context can be cancelled and (possibly eventually restarted) from + either side of the underlying IPC channel, open task oriented + message streams and acts as an IPC aware inter-actor-task cancel + scope. + + ''' + chan: Channel + cid: str + + # these are the "feeder" channels for delivering + # message values to the local task from the runtime + # msg processing loop. + _recv_chan: trio.MemoryReceiveChannel + _send_chan: trio.MemorySendChannel + + _remote_func_type: str | None = None + + # only set on the caller side + _portal: Portal | None = None # type: ignore # noqa + _result: Any | int = None + _remote_error: BaseException | None = None + + # cancellation state + _cancel_called: bool = False + _cancel_called_remote: tuple | None = None + _cancel_msg: str | None = None + _scope: trio.CancelScope | None = None + _enter_debugger_on_cancel: bool = True + + @property + def cancel_called(self) -> bool: + ''' + Records whether cancellation has been requested for this context + by either an explicit call to ``.cancel()`` or an implicit call + due to an error caught inside the ``Portal.open_context()`` + block. + + ''' + return self._cancel_called + + @property + def cancel_called_remote(self) -> tuple[str, str] | None: + ''' + ``Actor.uid`` of the remote actor who's task was cancelled + causing this side of the context to also be cancelled. + + ''' + remote_uid = self._cancel_called_remote + if remote_uid: + return tuple(remote_uid) + + @property + def cancelled_caught(self) -> bool: + return self._scope.cancelled_caught + + # init and streaming state + _started_called: bool = False + _started_received: bool = False + _stream_opened: bool = False + + # overrun handling machinery + # NOTE: none of this provides "backpressure" to the remote + # task, only an ability to not lose messages when the local + # task is configured to NOT transmit ``StreamOverrun``s back + # to the other side. + _overflow_q: deque[dict] = field( + default_factory=partial( + deque, + maxlen=616, + ) + ) + _scope_nursery: trio.Nursery | None = None + _in_overrun: bool = False + _allow_overruns: bool = False + + async def send_yield( + self, + data: Any, + + ) -> None: + + warnings.warn( + "`Context.send_yield()` is now deprecated. " + "Use ``MessageStream.send()``. ", + DeprecationWarning, + stacklevel=2, + ) + await self.chan.send({'yield': data, 'cid': self.cid}) + + async def send_stop(self) -> None: + await self.chan.send({'stop': True, 'cid': self.cid}) + + async def _maybe_cancel_and_set_remote_error( + self, + error_msg: dict[str, Any], + + ) -> None: + ''' + (Maybe) unpack and raise a msg error into the local scope + nursery for this context. + + Acts as a form of "relay" for a remote error raised + in the corresponding remote callee task. + + ''' + # If this is an error message from a context opened by + # ``Portal.open_context()`` we want to interrupt any ongoing + # (child) tasks within that context to be notified of the remote + # error relayed here. + # + # The reason we may want to raise the remote error immediately + # is that there is no guarantee the associated local task(s) + # will attempt to read from any locally opened stream any time + # soon. + # + # NOTE: this only applies when + # ``Portal.open_context()`` has been called since it is assumed + # (currently) that other portal APIs (``Portal.run()``, + # ``.run_in_actor()``) do their own error checking at the point + # of the call and result processing. + error = unpack_error( + error_msg, + self.chan, + ) + + # XXX: set the remote side's error so that after we cancel + # whatever task is the opener of this context it can raise + # that error as the reason. + self._remote_error = error + + if ( + isinstance(error, ContextCancelled) + ): + log.cancel( + 'Remote task-context sucessfully cancelled for ' + f'{self.chan.uid}:{self.cid}' + ) + + if self._cancel_called: + # this is an expected cancel request response message + # and we don't need to raise it in scope since it will + # potentially override a real error + return + else: + log.error( + f'Remote context error for {self.chan.uid}:{self.cid}:\n' + f'{error_msg["error"]["tb_str"]}' + ) + # TODO: tempted to **not** do this by-reraising in a + # nursery and instead cancel a surrounding scope, detect + # the cancellation, then lookup the error that was set? + # YES! this is way better and simpler! + if ( + self._scope + ): + # from trio.testing import wait_all_tasks_blocked + # await wait_all_tasks_blocked() + self._cancel_called_remote = self.chan.uid + self._scope.cancel() + + # NOTE: this usage actually works here B) + # from ._debug import breakpoint + # await breakpoint() + + # XXX: this will break early callee results sending + # since when `.result()` is finally called, this + # chan will be closed.. + # if self._recv_chan: + # await self._recv_chan.aclose() + + async def cancel( + self, + msg: str | None = None, + timeout: float = 0.5, + # timeout: float = 1000, + + ) -> None: + ''' + Cancel this inter-actor-task context. + + Request that the far side cancel it's current linked context, + Timeout quickly in an attempt to sidestep 2-generals... + + ''' + side = 'caller' if self._portal else 'callee' + if msg: + assert side == 'callee', 'Only callee side can provide cancel msg' + + log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') + + self._cancel_called = True + # await _debug.breakpoint() + # breakpoint() + + if side == 'caller': + if not self._portal: + raise RuntimeError( + "No portal found, this is likely a callee side context" + ) + + cid = self.cid + with trio.move_on_after(timeout) as cs: + # cs.shield = True + log.cancel( + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") + + # NOTE: we're telling the far end actor to cancel a task + # corresponding to *this actor*. The far end local channel + # instance is passed to `Actor._cancel_task()` implicitly. + await self._portal.run_from_ns( + 'self', + '_cancel_task', + cid=cid, + ) + # print("EXITING CANCEL CALL") + + if cs.cancelled_caught: + # XXX: there's no way to know if the remote task was indeed + # cancelled in the case where the connection is broken or + # some other network error occurred. + # if not self._portal.channel.connected(): + if not self.chan.connected(): + log.cancel( + "May have failed to cancel remote task " + f"{cid} for {self._portal.channel.uid}") + else: + log.cancel( + "Timed out on cancelling remote task " + f"{cid} for {self._portal.channel.uid}") + + # callee side remote task + else: + self._cancel_msg = msg + + # TODO: should we have an explicit cancel message + # or is relaying the local `trio.Cancelled` as an + # {'error': trio.Cancelled, cid: "blah"} enough? + # This probably gets into the discussion in + # https://github.com/goodboy/tractor/issues/36 + assert self._scope + self._scope.cancel() + + @acm + async def open_stream( + + self, + allow_overruns: bool | None = False, + msg_buffer_size: int | None = None, + + ) -> AsyncGenerator[MsgStream, None]: + ''' + Open a ``MsgStream``, a bi-directional stream connected to the + cross-actor (far end) task for this ``Context``. + + This context manager must be entered on both the caller and + callee for the stream to logically be considered "connected". + + A ``MsgStream`` is currently "one-shot" use, meaning if you + close it you can not "re-open" it for streaming and instead you + must re-establish a new surrounding ``Context`` using + ``Portal.open_context()``. In the future this may change but + currently there seems to be no obvious reason to support + "re-opening": + - pausing a stream can be done with a message. + - task errors will normally require a restart of the entire + scope of the inter-actor task context due to the nature of + ``trio``'s cancellation system. + + ''' + actor = current_actor() + + # here we create a mem chan that corresponds to the + # far end caller / callee. + + # Likewise if the surrounding context has been cancelled we error here + # since it likely means the surrounding block was exited or + # killed + + if self._cancel_called: + task = trio.lowlevel.current_task().name + raise ContextCancelled( + f'Context around {actor.uid[0]}:{task} was already cancelled!' + ) + + if not self._portal and not self._started_called: + raise RuntimeError( + 'Context.started()` must be called before opening a stream' + ) + + # NOTE: in one way streaming this only happens on the + # caller side inside `Actor.start_remote_task()` so if you try + # to send a stop from the caller to the callee in the + # single-direction-stream case you'll get a lookup error + # currently. + ctx = actor.get_context( + self.chan, + self.cid, + msg_buffer_size=msg_buffer_size, + allow_overruns=allow_overruns, + ) + ctx._allow_overruns = allow_overruns + assert ctx is self + + # XXX: If the underlying channel feeder receive mem chan has + # been closed then likely client code has already exited + # a ``.open_stream()`` block prior or there was some other + # unanticipated error or cancellation from ``trio``. + + if ctx._recv_chan._closed: + raise trio.ClosedResourceError( + 'The underlying channel for this stream was already closed!?') + + async with MsgStream( + ctx=self, + rx_chan=ctx._recv_chan, + ) as stream: + + if self._portal: + self._portal._streams.add(stream) + + try: + self._stream_opened = True + + # XXX: do we need this? + # ensure we aren't cancelled before yielding the stream + # await trio.lowlevel.checkpoint() + yield stream + + # NOTE: Make the stream "one-shot use". On exit, signal + # ``trio.EndOfChannel``/``StopAsyncIteration`` to the + # far end. + await stream.aclose() + + finally: + if self._portal: + try: + self._portal._streams.remove(stream) + except KeyError: + log.warning( + f'Stream was already destroyed?\n' + f'actor: {self.chan.uid}\n' + f'ctx id: {self.cid}' + ) + + def _maybe_raise_remote_err( + self, + err: Exception, + ) -> None: + # NOTE: whenever the context's "opener" side (task) **is** + # the side which requested the cancellation (likekly via + # ``Context.cancel()``), we don't want to re-raise that + # cancellation signal locally (would be akin to + # a ``trio.Nursery`` nursery raising ``trio.Cancelled`` + # whenever ``CancelScope.cancel()`` was called) and instead + # silently reap the expected cancellation "error"-msg. + # if 'pikerd' in err.msgdata['tb_str']: + # # from . import _debug + # # await _debug.breakpoint() + # breakpoint() + + if ( + isinstance(err, ContextCancelled) + and ( + self._cancel_called + or self.chan._cancel_called + or tuple(err.canceller) == current_actor().uid + ) + ): + return err + + raise err # from None + + async def result(self) -> Any | Exception: + ''' + From some (caller) side task, wait for and return the final + result from the remote (callee) side's task. + + This provides a mechanism for one task running in some actor to wait + on another task at the other side, in some other actor, to terminate. + + If the remote task is still in a streaming state (it is delivering + values from inside a ``Context.open_stream():`` block, then those + msgs are drained but discarded since it is presumed this side of + the context has already finished with its own streaming logic. + + If the remote context (or its containing actor runtime) was + canceled, either by a local task calling one of + ``Context.cancel()`` or `Portal.cancel_actor()``, we ignore the + received ``ContextCancelled`` exception if the context or + underlying IPC channel is marked as having been "cancel called". + This is similar behavior to using ``trio.Nursery.cancel()`` + wherein tasks which raise ``trio.Cancel`` are silently reaped; + the main different in this API is in the "cancel called" case, + instead of just not raising, we also return the exception *as + the result* since client code may be interested in the details + of the remote cancellation. + + ''' + assert self._portal, "Context.result() can not be called from callee!" + assert self._recv_chan + + # from . import _debug + # await _debug.breakpoint() + + re = self._remote_error + if re: + self._maybe_raise_remote_err(re) + return re + + if ( + self._result == id(self) + and not self._remote_error + and not self._recv_chan._closed # type: ignore + ): + # wait for a final context result consuming + # and discarding any bi dir stream msgs still + # in transit from the far end. + while True: + msg = await self._recv_chan.receive() + try: + self._result = msg['return'] + + # NOTE: we don't need to do this right? + # XXX: only close the rx mem chan AFTER + # a final result is retreived. + # if self._recv_chan: + # await self._recv_chan.aclose() + + break + except KeyError: # as msgerr: + + if 'yield' in msg: + # far end task is still streaming to us so discard + log.warning(f'Discarding stream delivered {msg}') + continue + + elif 'stop' in msg: + log.debug('Remote stream terminated') + continue + + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + + err = unpack_error( + msg, + self._portal.channel + ) # from msgerr + + err = self._maybe_raise_remote_err(err) + self._remote_err = err + + return self._remote_error or self._result + + async def started( + self, + value: Any | None = None + + ) -> None: + ''' + Indicate to calling actor's task that this linked context + has started and send ``value`` to the other side. + + On the calling side ``value`` is the second item delivered + in the tuple returned by ``Portal.open_context()``. + + ''' + if self._portal: + raise RuntimeError( + f"Caller side context {self} can not call started!") + + elif self._started_called: + raise RuntimeError( + f"called 'started' twice on context with {self.chan.uid}") + + await self.chan.send({'started': value, 'cid': self.cid}) + self._started_called = True + + # TODO: do we need a restart api? + # async def restart(self) -> None: + # pass + + async def _drain_overflows( + self, + ) -> None: + ''' + Private task spawned to push newly received msgs to the local + task which getting overrun by the remote side. + + In order to not block the rpc msg loop, but also not discard + msgs received in this context, we need to async push msgs in + a new task which only runs for as long as the local task is in + an overrun state. + + ''' + self._in_overrun = True + try: + while self._overflow_q: + # NOTE: these msgs should never be errors since we always do + # the check prior to checking if we're in an overrun state + # inside ``.deliver_msg()``. + msg = self._overflow_q.popleft() + try: + await self._send_chan.send(msg) + except trio.BrokenResourceError: + log.warning( + f"{self._send_chan} consumer is already closed" + ) + return + except trio.Cancelled: + # we are obviously still in overrun + # but the context is being closed anyway + # so we just warn that there are un received + # msgs still.. + self._overflow_q.appendleft(msg) + fmt_msgs = '' + for msg in self._overflow_q: + fmt_msgs += f'{pformat(msg)}\n' + + log.warning( + f'Context for {self.cid} is being closed while ' + 'in an overrun state!\n' + 'Discarding the following msgs:\n' + f'{fmt_msgs}\n' + ) + raise + + finally: + # task is now finished with the backlog so mark us as + # no longer in backlog. + self._in_overrun = False + + async def _deliver_msg( + self, + msg: dict, + + draining: bool = False, + + ) -> bool: + + cid = self.cid + chan = self.chan + uid = chan.uid + send_chan: trio.MemorySendChannel = self._send_chan + + log.runtime( + f"Delivering {msg} from {uid} to caller {cid}" + ) + + error = msg.get('error') + if error: + await self._maybe_cancel_and_set_remote_error(msg) + + if ( + self._in_overrun + ): + self._overflow_q.append(msg) + return False + + try: + send_chan.send_nowait(msg) + return True + # if an error is deteced we should always + # expect it to be raised by any context (stream) + # consumer task + + except trio.BrokenResourceError: + # TODO: what is the right way to handle the case where the + # local task has already sent a 'stop' / StopAsyncInteration + # to the other side but and possibly has closed the local + # feeder mem chan? Do we wait for some kind of ack or just + # let this fail silently and bubble up (currently)? + + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") + return False + + # NOTE XXX: by default we do **not** maintain context-stream + # backpressure and instead opt to relay stream overrun errors to + # the sender; the main motivation is that using bp can block the + # msg handling loop which calls into this method! + except trio.WouldBlock: + # XXX: always push an error even if the local + # receiver is in overrun state. + # await self._maybe_cancel_and_set_remote_error(msg) + + local_uid = current_actor().uid + lines = [ + f'OVERRUN on actor-task context {cid}@{local_uid}!\n' + # TODO: put remote task name here if possible? + f'remote sender actor: {uid}', + # TODO: put task func name here and maybe an arrow + # from sender to overrunner? + # f'local task {self.func_name}' + ] + if not self._stream_opened: + lines.insert( + 1, + f'\n*** No stream open on `{local_uid[0]}` side! ***\n' + ) + + text = '\n'.join(lines) + + # XXX: lul, this really can't be backpressure since any + # blocking here will block the entire msg loop rpc sched for + # a whole channel.. maybe we should rename it? + if self._allow_overruns: + text += f'\nStarting overflow queuing task on msg: {msg}' + log.warning(text) + if ( + not self._in_overrun + ): + self._overflow_q.append(msg) + n = self._scope_nursery + assert not n.child_tasks + try: + n.start_soon( + self._drain_overflows, + ) + except RuntimeError: + # if the nursery is already cancelled due to + # this context exiting or in error, we ignore + # the nursery error since we never expected + # anything different. + return False + else: + try: + raise StreamOverrun(text) + except StreamOverrun as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + try: + await chan.send(err_msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{chan} is already closed") + + return False + + +def mk_context( + chan: Channel, + cid: str, + msg_buffer_size: int = 2**6, + + **kwargs, + +) -> Context: + ''' + Internal factory to create an inter-actor task ``Context``. + + This is called by internals and should generally never be called + by user code. + + ''' + send_chan: trio.MemorySendChannel + recv_chan: trio.MemoryReceiveChannel + send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) + + ctx = Context( + chan, + cid, + _send_chan=send_chan, + _recv_chan=recv_chan, + **kwargs, + ) + ctx._result = id(ctx) + return ctx + + +def context(func: Callable) -> Callable: + ''' + Mark an async function as a streaming routine with ``@context``. + + ''' + # TODO: apply whatever solution ``mypy`` ends up picking for this: + # https://github.com/python/mypy/issues/2087#issuecomment-769266912 + func._tractor_context_function = True # type: ignore + + sig = inspect.signature(func) + params = sig.parameters + if 'ctx' not in params: + raise TypeError( + "The first argument to the context function " + f"{func.__name__} must be `ctx: tractor.Context`" + ) + return func diff --git a/tractor/_portal.py b/tractor/_portal.py index e61ac37..bf3e385 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -45,10 +45,8 @@ from ._exceptions import ( NoResult, ContextCancelled, ) -from ._streaming import ( - Context, - MsgStream, -) +from ._context import Context +from ._streaming import MsgStream log = get_logger(__name__) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 5bc8b1f..84583e9 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -46,7 +46,7 @@ import trio # type: ignore from trio_typing import TaskStatus from ._ipc import Channel -from ._streaming import ( +from ._context import ( mk_context, Context, ) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 4bf6d1c..3045b83 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -14,25 +14,18 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Message stream types and APIs. -""" +The machinery and types behind ``Context.open_stream()`` + +''' from __future__ import annotations import inspect -from contextlib import asynccontextmanager -from collections import deque -from dataclasses import ( - dataclass, - field, -) -from functools import partial -from pprint import pformat +from contextlib import asynccontextmanager as acm from typing import ( Any, - Optional, Callable, - AsyncGenerator, AsyncIterator, TYPE_CHECKING, ) @@ -40,22 +33,17 @@ import warnings import trio -from ._ipc import Channel from ._exceptions import ( unpack_error, - pack_error, - ContextCancelled, - StreamOverrun, ) from .log import get_logger -from ._state import current_actor from .trionics import ( broadcast_receiver, BroadcastReceiver, ) if TYPE_CHECKING: - from ._portal import Portal + from ._context import Context log = get_logger(__name__) @@ -87,9 +75,9 @@ class MsgStream(trio.abc.Channel): ''' def __init__( self, - ctx: 'Context', # typing: ignore # noqa + ctx: Context, # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, - _broadcaster: Optional[BroadcastReceiver] = None, + _broadcaster: BroadcastReceiver | None = None, ) -> None: self._ctx = ctx @@ -292,7 +280,7 @@ class MsgStream(trio.abc.Channel): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). - @asynccontextmanager + @acm async def subscribe( self, @@ -361,693 +349,6 @@ class MsgStream(trio.abc.Channel): await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) -@dataclass -class Context: - ''' - An inter-actor, ``trio`` task communication context. - - NB: This class should never be instatiated directly, it is delivered - by either, - - runtime machinery to a remotely started task or, - - by entering ``Portal.open_context()``. - - Allows maintaining task or protocol specific state between - 2 communicating actor tasks. A unique context is created on the - callee side/end for every request to a remote actor from a portal. - - A context can be cancelled and (possibly eventually restarted) from - either side of the underlying IPC channel, open task oriented - message streams and acts as an IPC aware inter-actor-task cancel - scope. - - ''' - chan: Channel - cid: str - - # these are the "feeder" channels for delivering - # message values to the local task from the runtime - # msg processing loop. - _recv_chan: trio.MemoryReceiveChannel - _send_chan: trio.MemorySendChannel - - _remote_func_type: str | None = None - - # only set on the caller side - _portal: Portal | None = None # type: ignore # noqa - _result: Any | int = None - _remote_error: BaseException | None = None - - # cancellation state - _cancel_called: bool = False - _cancel_called_remote: tuple | None = None - _cancel_msg: str | None = None - _scope: trio.CancelScope | None = None - _enter_debugger_on_cancel: bool = True - - @property - def cancel_called(self) -> bool: - ''' - Records whether cancellation has been requested for this context - by either an explicit call to ``.cancel()`` or an implicit call - due to an error caught inside the ``Portal.open_context()`` - block. - - ''' - return self._cancel_called - - @property - def cancel_called_remote(self) -> tuple[str, str] | None: - ''' - ``Actor.uid`` of the remote actor who's task was cancelled - causing this side of the context to also be cancelled. - - ''' - remote_uid = self._cancel_called_remote - if remote_uid: - return tuple(remote_uid) - - @property - def cancelled_caught(self) -> bool: - return self._scope.cancelled_caught - - # init and streaming state - _started_called: bool = False - _started_received: bool = False - _stream_opened: bool = False - - # overrun handling machinery - # NOTE: none of this provides "backpressure" to the remote - # task, only an ability to not lose messages when the local - # task is configured to NOT transmit ``StreamOverrun``s back - # to the other side. - _overflow_q: deque[dict] = field( - default_factory=partial( - deque, - maxlen=616, - ) - ) - _scope_nursery: trio.Nursery | None = None - _in_overrun: bool = False - _allow_overruns: bool = False - - async def send_yield( - self, - data: Any, - - ) -> None: - - warnings.warn( - "`Context.send_yield()` is now deprecated. " - "Use ``MessageStream.send()``. ", - DeprecationWarning, - stacklevel=2, - ) - await self.chan.send({'yield': data, 'cid': self.cid}) - - async def send_stop(self) -> None: - await self.chan.send({'stop': True, 'cid': self.cid}) - - async def _maybe_cancel_and_set_remote_error( - self, - error_msg: dict[str, Any], - - ) -> None: - ''' - (Maybe) unpack and raise a msg error into the local scope - nursery for this context. - - Acts as a form of "relay" for a remote error raised - in the corresponding remote callee task. - - ''' - # If this is an error message from a context opened by - # ``Portal.open_context()`` we want to interrupt any ongoing - # (child) tasks within that context to be notified of the remote - # error relayed here. - # - # The reason we may want to raise the remote error immediately - # is that there is no guarantee the associated local task(s) - # will attempt to read from any locally opened stream any time - # soon. - # - # NOTE: this only applies when - # ``Portal.open_context()`` has been called since it is assumed - # (currently) that other portal APIs (``Portal.run()``, - # ``.run_in_actor()``) do their own error checking at the point - # of the call and result processing. - error = unpack_error( - error_msg, - self.chan, - ) - - # XXX: set the remote side's error so that after we cancel - # whatever task is the opener of this context it can raise - # that error as the reason. - self._remote_error = error - - if ( - isinstance(error, ContextCancelled) - ): - log.cancel( - 'Remote task-context sucessfully cancelled for ' - f'{self.chan.uid}:{self.cid}' - ) - - if self._cancel_called: - # this is an expected cancel request response message - # and we don't need to raise it in scope since it will - # potentially override a real error - return - else: - log.error( - f'Remote context error for {self.chan.uid}:{self.cid}:\n' - f'{error_msg["error"]["tb_str"]}' - ) - # TODO: tempted to **not** do this by-reraising in a - # nursery and instead cancel a surrounding scope, detect - # the cancellation, then lookup the error that was set? - # YES! this is way better and simpler! - if ( - self._scope - ): - # from trio.testing import wait_all_tasks_blocked - # await wait_all_tasks_blocked() - self._cancel_called_remote = self.chan.uid - self._scope.cancel() - - # NOTE: this usage actually works here B) - # from ._debug import breakpoint - # await breakpoint() - - # XXX: this will break early callee results sending - # since when `.result()` is finally called, this - # chan will be closed.. - # if self._recv_chan: - # await self._recv_chan.aclose() - - async def cancel( - self, - msg: str | None = None, - timeout: float = 0.5, - # timeout: float = 1000, - - ) -> None: - ''' - Cancel this inter-actor-task context. - - Request that the far side cancel it's current linked context, - Timeout quickly in an attempt to sidestep 2-generals... - - ''' - side = 'caller' if self._portal else 'callee' - if msg: - assert side == 'callee', 'Only callee side can provide cancel msg' - - log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') - - self._cancel_called = True - # await _debug.breakpoint() - # breakpoint() - - if side == 'caller': - if not self._portal: - raise RuntimeError( - "No portal found, this is likely a callee side context" - ) - - cid = self.cid - with trio.move_on_after(timeout) as cs: - # cs.shield = True - log.cancel( - f"Cancelling stream {cid} to " - f"{self._portal.channel.uid}") - - # NOTE: we're telling the far end actor to cancel a task - # corresponding to *this actor*. The far end local channel - # instance is passed to `Actor._cancel_task()` implicitly. - await self._portal.run_from_ns( - 'self', - '_cancel_task', - cid=cid, - ) - # print("EXITING CANCEL CALL") - - if cs.cancelled_caught: - # XXX: there's no way to know if the remote task was indeed - # cancelled in the case where the connection is broken or - # some other network error occurred. - # if not self._portal.channel.connected(): - if not self.chan.connected(): - log.cancel( - "May have failed to cancel remote task " - f"{cid} for {self._portal.channel.uid}") - else: - log.cancel( - "Timed out on cancelling remote task " - f"{cid} for {self._portal.channel.uid}") - - # callee side remote task - else: - self._cancel_msg = msg - - # TODO: should we have an explicit cancel message - # or is relaying the local `trio.Cancelled` as an - # {'error': trio.Cancelled, cid: "blah"} enough? - # This probably gets into the discussion in - # https://github.com/goodboy/tractor/issues/36 - assert self._scope - self._scope.cancel() - - @asynccontextmanager - async def open_stream( - - self, - allow_overruns: bool | None = False, - msg_buffer_size: int | None = None, - - ) -> AsyncGenerator[MsgStream, None]: - ''' - Open a ``MsgStream``, a bi-directional stream connected to the - cross-actor (far end) task for this ``Context``. - - This context manager must be entered on both the caller and - callee for the stream to logically be considered "connected". - - A ``MsgStream`` is currently "one-shot" use, meaning if you - close it you can not "re-open" it for streaming and instead you - must re-establish a new surrounding ``Context`` using - ``Portal.open_context()``. In the future this may change but - currently there seems to be no obvious reason to support - "re-opening": - - pausing a stream can be done with a message. - - task errors will normally require a restart of the entire - scope of the inter-actor task context due to the nature of - ``trio``'s cancellation system. - - ''' - actor = current_actor() - - # here we create a mem chan that corresponds to the - # far end caller / callee. - - # Likewise if the surrounding context has been cancelled we error here - # since it likely means the surrounding block was exited or - # killed - - if self._cancel_called: - task = trio.lowlevel.current_task().name - raise ContextCancelled( - f'Context around {actor.uid[0]}:{task} was already cancelled!' - ) - - if not self._portal and not self._started_called: - raise RuntimeError( - 'Context.started()` must be called before opening a stream' - ) - - # NOTE: in one way streaming this only happens on the - # caller side inside `Actor.start_remote_task()` so if you try - # to send a stop from the caller to the callee in the - # single-direction-stream case you'll get a lookup error - # currently. - ctx = actor.get_context( - self.chan, - self.cid, - msg_buffer_size=msg_buffer_size, - allow_overruns=allow_overruns, - ) - ctx._allow_overruns = allow_overruns - assert ctx is self - - # XXX: If the underlying channel feeder receive mem chan has - # been closed then likely client code has already exited - # a ``.open_stream()`` block prior or there was some other - # unanticipated error or cancellation from ``trio``. - - if ctx._recv_chan._closed: - raise trio.ClosedResourceError( - 'The underlying channel for this stream was already closed!?') - - async with MsgStream( - ctx=self, - rx_chan=ctx._recv_chan, - ) as stream: - - if self._portal: - self._portal._streams.add(stream) - - try: - self._stream_opened = True - - # XXX: do we need this? - # ensure we aren't cancelled before yielding the stream - # await trio.lowlevel.checkpoint() - yield stream - - # NOTE: Make the stream "one-shot use". On exit, signal - # ``trio.EndOfChannel``/``StopAsyncIteration`` to the - # far end. - await stream.aclose() - - finally: - if self._portal: - try: - self._portal._streams.remove(stream) - except KeyError: - log.warning( - f'Stream was already destroyed?\n' - f'actor: {self.chan.uid}\n' - f'ctx id: {self.cid}' - ) - - def _maybe_raise_remote_err( - self, - err: Exception, - ) -> None: - # NOTE: whenever the context's "opener" side (task) **is** - # the side which requested the cancellation (likekly via - # ``Context.cancel()``), we don't want to re-raise that - # cancellation signal locally (would be akin to - # a ``trio.Nursery`` nursery raising ``trio.Cancelled`` - # whenever ``CancelScope.cancel()`` was called) and instead - # silently reap the expected cancellation "error"-msg. - # if 'pikerd' in err.msgdata['tb_str']: - # # from . import _debug - # # await _debug.breakpoint() - # breakpoint() - - if ( - isinstance(err, ContextCancelled) - and ( - self._cancel_called - or self.chan._cancel_called - or tuple(err.canceller) == current_actor().uid - ) - ): - return err - - raise err # from None - - async def result(self) -> Any | Exception: - ''' - From some (caller) side task, wait for and return the final - result from the remote (callee) side's task. - - This provides a mechanism for one task running in some actor to wait - on another task at the other side, in some other actor, to terminate. - - If the remote task is still in a streaming state (it is delivering - values from inside a ``Context.open_stream():`` block, then those - msgs are drained but discarded since it is presumed this side of - the context has already finished with its own streaming logic. - - If the remote context (or its containing actor runtime) was - canceled, either by a local task calling one of - ``Context.cancel()`` or `Portal.cancel_actor()``, we ignore the - received ``ContextCancelled`` exception if the context or - underlying IPC channel is marked as having been "cancel called". - This is similar behavior to using ``trio.Nursery.cancel()`` - wherein tasks which raise ``trio.Cancel`` are silently reaped; - the main different in this API is in the "cancel called" case, - instead of just not raising, we also return the exception *as - the result* since client code may be interested in the details - of the remote cancellation. - - ''' - assert self._portal, "Context.result() can not be called from callee!" - assert self._recv_chan - - # from . import _debug - # await _debug.breakpoint() - - re = self._remote_error - if re: - self._maybe_raise_remote_err(re) - return re - - if ( - self._result == id(self) - and not self._remote_error - and not self._recv_chan._closed # type: ignore - ): - # wait for a final context result consuming - # and discarding any bi dir stream msgs still - # in transit from the far end. - while True: - msg = await self._recv_chan.receive() - try: - self._result = msg['return'] - - # NOTE: we don't need to do this right? - # XXX: only close the rx mem chan AFTER - # a final result is retreived. - # if self._recv_chan: - # await self._recv_chan.aclose() - - break - except KeyError: # as msgerr: - - if 'yield' in msg: - # far end task is still streaming to us so discard - log.warning(f'Discarding stream delivered {msg}') - continue - - elif 'stop' in msg: - log.debug('Remote stream terminated') - continue - - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?") - - err = unpack_error( - msg, - self._portal.channel - ) # from msgerr - - err = self._maybe_raise_remote_err(err) - self._remote_err = err - - return self._remote_error or self._result - - async def started( - self, - value: Any | None = None - - ) -> None: - ''' - Indicate to calling actor's task that this linked context - has started and send ``value`` to the other side. - - On the calling side ``value`` is the second item delivered - in the tuple returned by ``Portal.open_context()``. - - ''' - if self._portal: - raise RuntimeError( - f"Caller side context {self} can not call started!") - - elif self._started_called: - raise RuntimeError( - f"called 'started' twice on context with {self.chan.uid}") - - await self.chan.send({'started': value, 'cid': self.cid}) - self._started_called = True - - # TODO: do we need a restart api? - # async def restart(self) -> None: - # pass - - async def _drain_overflows( - self, - ) -> None: - ''' - Private task spawned to push newly received msgs to the local - task which getting overrun by the remote side. - - In order to not block the rpc msg loop, but also not discard - msgs received in this context, we need to async push msgs in - a new task which only runs for as long as the local task is in - an overrun state. - - ''' - self._in_overrun = True - try: - while self._overflow_q: - # NOTE: these msgs should never be errors since we always do - # the check prior to checking if we're in an overrun state - # inside ``.deliver_msg()``. - msg = self._overflow_q.popleft() - try: - await self._send_chan.send(msg) - except trio.BrokenResourceError: - log.warning( - f"{self._send_chan} consumer is already closed" - ) - return - except trio.Cancelled: - # we are obviously still in overrun - # but the context is being closed anyway - # so we just warn that there are un received - # msgs still.. - self._overflow_q.appendleft(msg) - fmt_msgs = '' - for msg in self._overflow_q: - fmt_msgs += f'{pformat(msg)}\n' - - log.warning( - f'Context for {self.cid} is being closed while ' - 'in an overrun state!\n' - 'Discarding the following msgs:\n' - f'{fmt_msgs}\n' - ) - raise - - finally: - # task is now finished with the backlog so mark us as - # no longer in backlog. - self._in_overrun = False - - async def _deliver_msg( - self, - msg: dict, - - draining: bool = False, - - ) -> bool: - - cid = self.cid - chan = self.chan - uid = chan.uid - send_chan: trio.MemorySendChannel = self._send_chan - - log.runtime( - f"Delivering {msg} from {uid} to caller {cid}" - ) - - error = msg.get('error') - if error: - await self._maybe_cancel_and_set_remote_error(msg) - - if ( - self._in_overrun - ): - self._overflow_q.append(msg) - return False - - try: - send_chan.send_nowait(msg) - return True - # if an error is deteced we should always - # expect it to be raised by any context (stream) - # consumer task - - except trio.BrokenResourceError: - # TODO: what is the right way to handle the case where the - # local task has already sent a 'stop' / StopAsyncInteration - # to the other side but and possibly has closed the local - # feeder mem chan? Do we wait for some kind of ack or just - # let this fail silently and bubble up (currently)? - - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{send_chan} consumer is already closed") - return False - - # NOTE XXX: by default we do **not** maintain context-stream - # backpressure and instead opt to relay stream overrun errors to - # the sender; the main motivation is that using bp can block the - # msg handling loop which calls into this method! - except trio.WouldBlock: - # XXX: always push an error even if the local - # receiver is in overrun state. - # await self._maybe_cancel_and_set_remote_error(msg) - - local_uid = current_actor().uid - lines = [ - f'OVERRUN on actor-task context {cid}@{local_uid}!\n' - # TODO: put remote task name here if possible? - f'remote sender actor: {uid}', - # TODO: put task func name here and maybe an arrow - # from sender to overrunner? - # f'local task {self.func_name}' - ] - if not self._stream_opened: - lines.insert( - 1, - f'\n*** No stream open on `{local_uid[0]}` side! ***\n' - ) - - text = '\n'.join(lines) - - # XXX: lul, this really can't be backpressure since any - # blocking here will block the entire msg loop rpc sched for - # a whole channel.. maybe we should rename it? - if self._allow_overruns: - text += f'\nStarting overflow queuing task on msg: {msg}' - log.warning(text) - if ( - not self._in_overrun - ): - self._overflow_q.append(msg) - n = self._scope_nursery - assert not n.child_tasks - try: - n.start_soon( - self._drain_overflows, - ) - except RuntimeError: - # if the nursery is already cancelled due to - # this context exiting or in error, we ignore - # the nursery error since we never expected - # anything different. - return False - else: - try: - raise StreamOverrun(text) - except StreamOverrun as err: - err_msg = pack_error(err) - err_msg['cid'] = cid - try: - await chan.send(err_msg) - except trio.BrokenResourceError: - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{chan} is already closed") - - return False - - -def mk_context( - chan: Channel, - cid: str, - msg_buffer_size: int = 2**6, - - **kwargs, - -) -> Context: - ''' - Internal factory to create an inter-actor task ``Context``. - - This is called by internals and should generally never be called - by user code. - - ''' - send_chan: trio.MemorySendChannel - recv_chan: trio.MemoryReceiveChannel - send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) - - ctx = Context( - chan, - cid, - _send_chan=send_chan, - _recv_chan=recv_chan, - **kwargs, - ) - ctx._result = id(ctx) - return ctx - - def stream(func: Callable) -> Callable: ''' Mark an async function as a streaming routine with ``@stream``. @@ -1078,22 +379,3 @@ def stream(func: Callable) -> Callable: "(Or ``to_trio`` if using ``asyncio`` in guest mode)." ) return func - - -def context(func: Callable) -> Callable: - ''' - Mark an async function as a streaming routine with ``@context``. - - ''' - # TODO: apply whatever solution ``mypy`` ends up picking for this: - # https://github.com/python/mypy/issues/2087#issuecomment-769266912 - func._tractor_context_function = True # type: ignore - - sig = inspect.signature(func) - params = sig.parameters - if 'ctx' not in params: - raise TypeError( - "The first argument to the context function " - f"{func.__name__} must be `ctx: tractor.Context`" - ) - return func diff --git a/tractor/experimental/_pubsub.py b/tractor/experimental/_pubsub.py index 99117b0..89f286d 100644 --- a/tractor/experimental/_pubsub.py +++ b/tractor/experimental/_pubsub.py @@ -37,7 +37,7 @@ import trio import wrapt from ..log import get_logger -from .._streaming import Context +from .._context import Context __all__ = ['pub'] @@ -148,7 +148,8 @@ def pub( *, tasks: set[str] = set(), ): - """Publisher async generator decorator. + ''' + Publisher async generator decorator. A publisher can be called multiple times from different actors but will only spawn a finite set of internal tasks to stream values to @@ -227,7 +228,8 @@ def pub( running in a single actor to stream data to an arbitrary number of subscribers. If you are ok to have a new task running for every call to ``pub_service()`` then probably don't need this. - """ + + ''' global _pubtask2lock # handle the decorator not called with () case