diff --git a/tractor/_context.py b/tractor/_context.py index 2230598..abcb90e 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -25,26 +25,31 @@ disjoint, parallel executing tasks in separate actors. ''' from __future__ import annotations from collections import deque -from contextlib import asynccontextmanager as acm -from contextvars import ContextVar +from contextlib import ( + asynccontextmanager as acm, +) from dataclasses import ( dataclass, field, ) from functools import partial import inspect -import msgspec from pprint import pformat from typing import ( Any, Callable, AsyncGenerator, + Type, TYPE_CHECKING, + Union, ) import warnings - +# ------ - ------ import trio - +from msgspec import ( + ValidationError, +) +# ------ - ------ from ._exceptions import ( ContextCancelled, InternalError, @@ -53,7 +58,6 @@ from ._exceptions import ( StreamOverrun, pack_from_raise, unpack_error, - _raise_from_no_key_in_msg, ) from .log import get_logger from .msg import ( @@ -70,8 +74,12 @@ from .msg import ( current_codec, pretty_struct, types as msgtypes, + _ops as msgops, +) +from ._ipc import ( + Channel, + _mk_msg_type_err, ) -from ._ipc import Channel from ._streaming import MsgStream from ._state import ( current_actor, @@ -86,294 +94,9 @@ if TYPE_CHECKING: CallerInfo, ) - log = get_logger(__name__) -async def _drain_to_final_msg( - ctx: Context, - - hide_tb: bool = True, - msg_limit: int = 6, - -) -> tuple[ - Return|None, - list[MsgType] -]: - ''' - Drain IPC msgs delivered to the underlying rx-mem-chan - `Context._recv_chan` from the runtime in search for a final - result or error msg. - - The motivation here is to ideally capture errors during ctxc - conditions where a canc-request/or local error is sent but the - local task also excepts and enters the - `Portal.open_context().__aexit__()` block wherein we prefer to - capture and raise any remote error or ctxc-ack as part of the - `ctx.result()` cleanup and teardown sequence. - - ''' - __tracebackhide__: bool = hide_tb - raise_overrun: bool = not ctx._allow_overruns - - # wait for a final context result by collecting (but - # basically ignoring) any bi-dir-stream msgs still in transit - # from the far end. - pre_result_drained: list[MsgType] = [] - return_msg: Return|None = None - while not ( - ctx.maybe_error - and not ctx._final_result_is_set() - ): - try: - # TODO: can remove? - # await trio.lowlevel.checkpoint() - - # NOTE: this REPL usage actually works here dawg! Bo - # from .devx._debug import pause - # await pause() - - # TODO: bad idea? - # -[ ] wrap final outcome channel wait in a scope so - # it can be cancelled out of band if needed? - # - # with trio.CancelScope() as res_cs: - # ctx._res_scope = res_cs - # msg: dict = await ctx._recv_chan.receive() - # if res_cs.cancelled_caught: - - # TODO: ensure there's no more hangs, debugging the - # runtime pretty preaase! - # from .devx._debug import pause - # await pause() - - # TODO: can remove this finally? - # we have no more need for the sync draining right - # since we're can kinda guarantee the async - # `.receive()` below will never block yah? - # - # if ( - # ctx._cancel_called and ( - # ctx.cancel_acked - # # or ctx.chan._cancel_called - # ) - # # or not ctx._final_result_is_set() - # # ctx.outcome is not - # # or ctx.chan._closed - # ): - # try: - # msg: dict = await ctx._recv_chan.receive_nowait()() - # except trio.WouldBlock: - # log.warning( - # 'When draining already `.cancel_called` ctx!\n' - # 'No final msg arrived..\n' - # ) - # break - # else: - # msg: dict = await ctx._recv_chan.receive() - - # TODO: don't need it right jefe? - # with trio.move_on_after(1) as cs: - # if cs.cancelled_caught: - # from .devx._debug import pause - # await pause() - - # pray to the `trio` gawds that we're corrent with this - # msg: dict = await ctx._recv_chan.receive() - msg: MsgType = await ctx._recv_chan.receive() - - # NOTE: we get here if the far end was - # `ContextCancelled` in 2 cases: - # 1. we requested the cancellation and thus - # SHOULD NOT raise that far end error, - # 2. WE DID NOT REQUEST that cancel and thus - # SHOULD RAISE HERE! - except trio.Cancelled: - - # CASE 2: mask the local cancelled-error(s) - # only when we are sure the remote error is - # the source cause of this local task's - # cancellation. - ctx.maybe_raise() - - # CASE 1: we DID request the cancel we simply - # continue to bubble up as normal. - raise - - match msg: - - # final result arrived! - case Return( - # cid=cid, - pld=res, - ): - ctx._result: Any = res - log.runtime( - 'Context delivered final draining msg:\n' - f'{pformat(msg)}' - ) - # XXX: only close the rx mem chan AFTER - # a final result is retreived. - # if ctx._recv_chan: - # await ctx._recv_chan.aclose() - # TODO: ^ we don't need it right? - return_msg = msg - break - - # far end task is still streaming to us so discard - # and report depending on local ctx state. - case Yield(): - pre_result_drained.append(msg) - if ( - (ctx._stream.closed - and (reason := 'stream was already closed') - ) - or (ctx.cancel_acked - and (reason := 'ctx cancelled other side') - ) - or (ctx._cancel_called - and (reason := 'ctx called `.cancel()`') - ) - or (len(pre_result_drained) > msg_limit - and (reason := f'"yield" limit={msg_limit}') - ) - ): - log.cancel( - 'Cancelling `MsgStream` drain since ' - f'{reason}\n\n' - f'<= {ctx.chan.uid}\n' - f' |_{ctx._nsf}()\n\n' - f'=> {ctx._task}\n' - f' |_{ctx._stream}\n\n' - - f'{pformat(msg)}\n' - ) - return ( - return_msg, - pre_result_drained, - ) - - # drain up to the `msg_limit` hoping to get - # a final result or error/ctxc. - else: - log.warning( - 'Ignoring "yield" msg during `ctx.result()` drain..\n' - f'<= {ctx.chan.uid}\n' - f' |_{ctx._nsf}()\n\n' - f'=> {ctx._task}\n' - f' |_{ctx._stream}\n\n' - - f'{pformat(msg)}\n' - ) - continue - - # stream terminated, but no result yet.. - # - # TODO: work out edge cases here where - # a stream is open but the task also calls - # this? - # -[ ] should be a runtime error if a stream is open right? - # Stop() - case Stop(): - pre_result_drained.append(msg) - log.cancel( - 'Remote stream terminated due to "stop" msg:\n\n' - f'{pformat(msg)}\n' - ) - continue - - # remote error msg, likely already handled inside - # `Context._deliver_msg()` - case Error(): - # TODO: can we replace this with `ctx.maybe_raise()`? - # -[ ] would this be handier for this case maybe? - # async with maybe_raise_on_exit() as raises: - # if raises: - # log.error('some msg about raising..') - # - re: Exception|None = ctx._remote_error - if re: - assert msg is ctx._cancel_msg - # NOTE: this solved a super duper edge case XD - # this was THE super duper edge case of: - # - local task opens a remote task, - # - requests remote cancellation of far end - # ctx/tasks, - # - needs to wait for the cancel ack msg - # (ctxc) or some result in the race case - # where the other side's task returns - # before the cancel request msg is ever - # rxed and processed, - # - here this surrounding drain loop (which - # iterates all ipc msgs until the ack or - # an early result arrives) was NOT exiting - # since we are the edge case: local task - # does not re-raise any ctxc it receives - # IFF **it** was the cancellation - # requester.. - # - # XXX will raise if necessary but ow break - # from loop presuming any supressed error - # (ctxc) should terminate the context! - ctx._maybe_raise_remote_err( - re, - # NOTE: obvi we don't care if we - # overran the far end if we're already - # waiting on a final result (msg). - # raise_overrun_from_self=False, - raise_overrun_from_self=raise_overrun, - ) - - break # OOOOOF, yeah obvi we need this.. - - # XXX we should never really get here - # right! since `._deliver_msg()` should - # always have detected an {'error': ..} - # msg and already called this right!?! - elif error := unpack_error( - msg=msg, - chan=ctx._portal.channel, - hide_tb=False, - ): - log.critical('SHOULD NEVER GET HERE!?') - assert msg is ctx._cancel_msg - assert error.msgdata == ctx._remote_error.msgdata - assert error.ipc_msg == ctx._remote_error.ipc_msg - from .devx._debug import pause - await pause() - ctx._maybe_cancel_and_set_remote_error(error) - ctx._maybe_raise_remote_err(error) - - else: - # bubble the original src key error - raise - - # XXX should pretty much never get here unless someone - # overrides the default `MsgType` spec. - case _: - pre_result_drained.append(msg) - # It's definitely an internal error if any other - # msg type without a`'cid'` field arrives here! - if not msg.cid: - raise InternalError( - 'Unexpected cid-missing msg?\n\n' - f'{msg}\n' - ) - - raise RuntimeError('Unknown msg type: {msg}') - - else: - log.cancel( - 'Skipping `MsgStream` drain since final outcome is set\n\n' - f'{ctx.outcome}\n' - ) - - return ( - return_msg, - pre_result_drained, - ) - - class Unresolved: ''' Placeholder value for `Context._result` until @@ -423,9 +146,12 @@ class Context: # the "feeder" channels for delivering message values to the # local task from the runtime's msg processing loop. - _recv_chan: trio.MemoryReceiveChannel + _rx_chan: trio.MemoryReceiveChannel _send_chan: trio.MemorySendChannel + # payload receiver + _pld_rx: msgops.PldRx + # full "namespace-path" to target RPC function _nsf: NamespacePath @@ -447,7 +173,7 @@ class Context: _task: trio.lowlevel.Task|None = None # TODO: cs around result waiting so we can cancel any - # permanently blocking `._recv_chan.receive()` call in + # permanently blocking `._rx_chan.receive()` call in # a drain loop? # _res_scope: trio.CancelScope|None = None @@ -504,14 +230,6 @@ class Context: _started_called: bool = False _stream_opened: bool = False _stream: MsgStream|None = None - _pld_codec_var: ContextVar[MsgCodec] = ContextVar( - 'pld_codec', - default=_codec._def_msgspec_codec, # i.e. `Any`-payloads - ) - - @property - def pld_codec(self) -> MsgCodec|None: - return self._pld_codec_var.get() # caller of `Portal.open_context()` for # logging purposes mostly @@ -916,9 +634,8 @@ class Context: else: log.error( f'Remote context error:\n\n' - + # f'{pformat(self)}\n' f'{error}\n' - f'{pformat(self)}\n' ) # always record the cancelling actor's uid since its @@ -955,24 +672,49 @@ class Context: and not self._is_self_cancelled() and not cs.cancel_called and not cs.cancelled_caught - and ( - msgerr - and - # NOTE: allow user to config not cancelling the - # local scope on `MsgTypeError`s - self._cancel_on_msgerr - ) ): - # TODO: it'd sure be handy to inject our own - # `trio.Cancelled` subtype here ;) - # https://github.com/goodboy/tractor/issues/368 - log.cancel('Cancelling local `.open_context()` scope!') - self._scope.cancel() + if not ( + msgerr + # NOTE: we allow user to config not cancelling the + # local scope on `MsgTypeError`s + and not self._cancel_on_msgerr + ): + # TODO: it'd sure be handy to inject our own + # `trio.Cancelled` subtype here ;) + # https://github.com/goodboy/tractor/issues/368 + message: str = 'Cancelling `Context._scope` !\n\n' + self._scope.cancel() + + else: + message: str = ( + 'NOT Cancelling `Context._scope` since,\n' + f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' + f'AND we got a msg-type-error!\n' + f'{error}\n' + ) else: - log.cancel('NOT cancelling local `.open_context()` scope!') + message: str = 'NOT cancelling `Context._scope` !\n\n' + scope_info: str = 'No `self._scope: CancelScope` was set/used ?' + if cs: + scope_info: str = ( + f'self._scope: {cs}\n' + f'|_ .cancel_called: {cs.cancel_called}\n' + f'|_ .cancelled_caught: {cs.cancelled_caught}\n' + f'|_ ._cancel_status: {cs._cancel_status}\n\n' + f'{self}\n' + f'|_ ._is_self_cancelled(): {self._is_self_cancelled()}\n' + f'|_ ._cancel_on_msgerr: {self._cancel_on_msgerr}\n\n' + + f'msgerr: {msgerr}\n' + ) + log.cancel( + message + + + f'{scope_info}' + ) # TODO: maybe we should also call `._res_scope.cancel()` if it # exists to support cancelling any drain loop hangs? @@ -1256,7 +998,7 @@ class Context: # a ``.open_stream()`` block prior or there was some other # unanticipated error or cancellation from ``trio``. - if ctx._recv_chan._closed: + if ctx._rx_chan._closed: raise trio.ClosedResourceError( 'The underlying channel for this stream was already closed!\n' ) @@ -1276,7 +1018,7 @@ class Context: # stream WAS NOT just closed normally/gracefully. async with MsgStream( ctx=self, - rx_chan=ctx._recv_chan, + rx_chan=ctx._rx_chan, ) as stream: # NOTE: we track all existing streams per portal for @@ -1427,13 +1169,12 @@ class Context: # boxed `StreamOverrun`. This is mostly useful for # supressing such faults during # cancellation/error/final-result handling inside - # `_drain_to_final_msg()` such that we do not + # `msg._ops.drain_to_final_msg()` such that we do not # raise such errors particularly in the case where # `._cancel_called == True`. not raise_overrun_from_self and isinstance(remote_error, RemoteActorError) - - and remote_error.boxed_type_str == 'StreamOverrun' + and remote_error.boxed_type is StreamOverrun # and tuple(remote_error.msgdata['sender']) == our_uid and tuple(remote_error.sender) == our_uid @@ -1503,12 +1244,12 @@ class Context: if self._final_result_is_set(): return self._result - assert self._recv_chan + assert self._rx_chan raise_overrun: bool = not self._allow_overruns if ( self.maybe_error is None and - not self._recv_chan._closed # type: ignore + not self._rx_chan._closed # type: ignore ): # wait for a final context result/error by "draining" # (by more or less ignoring) any bi-dir-stream "yield" @@ -1516,7 +1257,7 @@ class Context: ( return_msg, drained_msgs, - ) = await _drain_to_final_msg( + ) = await msgops.drain_to_final_msg( ctx=self, hide_tb=hide_tb, ) @@ -1802,8 +1543,7 @@ class Context: await self.chan.send(started_msg) # raise any msg type error NO MATTER WHAT! - except msgspec.ValidationError as verr: - from tractor._ipc import _mk_msg_type_err + except ValidationError as verr: raise _mk_msg_type_err( msg=msg_bytes, codec=codec, @@ -1890,7 +1630,7 @@ class Context: - NEVER `return` early before delivering the msg! bc if the error is a ctxc and there is a task waiting on `.result()` we need the msg to be - `send_chan.send_nowait()`-ed over the `._recv_chan` so + `send_chan.send_nowait()`-ed over the `._rx_chan` so that the error is relayed to that waiter task and thus raised in user code! @@ -2201,24 +1941,11 @@ async def open_context_from_portal( # -> it's expected that if there is an error in this phase of # the dialog, the `Error` msg should be raised from the `msg` # handling block below. - msg: Started = await ctx._recv_chan.receive() - try: - # the "first" value here is delivered by the callee's - # ``Context.started()`` call. - # first: Any = msg['started'] - first: Any = msg.pld - ctx._started_called: bool = True - - # except KeyError as src_error: - except AttributeError as src_error: - log.exception('Raising from unexpected msg!\n') - _raise_from_no_key_in_msg( - ctx=ctx, - msg=msg, - src_err=src_error, - log=log, - expect_msg=Started, - ) + first: Any = await ctx._pld_rx.recv_pld( + ctx=ctx, + expect_msg=Started, + ) + ctx._started_called: bool = True uid: tuple = portal.channel.uid cid: str = ctx.cid @@ -2540,7 +2267,7 @@ async def open_context_from_portal( # we tear down the runtime feeder chan last # to avoid premature stream clobbers. if ( - (rxchan := ctx._recv_chan) + (rxchan := ctx._rx_chan) # maybe TODO: yes i know the below check is # touching `trio` memchan internals..BUT, there are @@ -2583,7 +2310,7 @@ async def open_context_from_portal( # underlying feeder channel is # once-and-only-CLOSED! with trio.CancelScope(shield=True): - await ctx._recv_chan.aclose() + await ctx._rx_chan.aclose() # XXX: we always raise remote errors locally and # generally speaking mask runtime-machinery related @@ -2628,9 +2355,9 @@ async def open_context_from_portal( # FINALLY, remove the context from runtime tracking and # exit! log.runtime( - 'Removing IPC ctx opened with peer\n' - f'{uid}\n' - f'|_{ctx}\n' + 'De-allocating IPC ctx opened with {ctx.side!r} peer \n' + f'uid: {uid}\n' + f'cid: {ctx.cid}\n' ) portal.actor._contexts.pop( (uid, cid), @@ -2643,6 +2370,7 @@ def mk_context( nsf: NamespacePath, msg_buffer_size: int = 2**6, + pld_spec: Union[Type] = Any, **kwargs, @@ -2662,12 +2390,18 @@ def mk_context( from .devx._code import find_caller_info caller_info: CallerInfo|None = find_caller_info() + pld_rx = msgops.PldRx( + # _rx_mc=recv_chan, + _msgdec=_codec.mk_dec(spec=pld_spec) + ) + ctx = Context( chan=chan, cid=cid, _actor=current_actor(), _send_chan=send_chan, - _recv_chan=recv_chan, + _rx_chan=recv_chan, + _pld_rx=pld_rx, _nsf=nsf, _task=trio.lowlevel.current_task(), _caller_info=caller_info, diff --git a/tractor/_portal.py b/tractor/_portal.py index 052dd8e..9726897 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -31,7 +31,7 @@ from typing import ( Any, Callable, AsyncGenerator, - # Type, + TYPE_CHECKING, ) from functools import partial from dataclasses import dataclass @@ -46,12 +46,12 @@ from ._state import ( from ._ipc import Channel from .log import get_logger from .msg import ( - Error, + # Error, NamespacePath, Return, ) from ._exceptions import ( - unpack_error, + # unpack_error, NoResult, ) from ._context import ( @@ -62,42 +62,44 @@ from ._streaming import ( MsgStream, ) +if TYPE_CHECKING: + from ._runtime import Actor log = get_logger(__name__) -# TODO: rename to `unwrap_result()` and use -# `._raise_from_no_key_in_msg()` (after tweak to -# accept a `chan: Channel` arg) in key block! -def _unwrap_msg( - msg: Return|Error, - channel: Channel, +# TODO: remove and/or rework? +# -[ ] rename to `unwrap_result()` and use +# `._raise_from_unexpected_msg()` (after tweak to accept a `chan: +# Channel` arg) in key block?? +# -[ ] pretty sure this is entirely covered by +# `_exceptions._raise_from_unexpected_msg()` so REMOVE! +# def _unwrap_msg( +# msg: Return|Error, +# ctx: Context, - hide_tb: bool = True, +# hide_tb: bool = True, -) -> Any: - ''' - Unwrap a final result from a `{return: }` IPC msg. +# ) -> Any: +# ''' +# Unwrap a final result from a `{return: }` IPC msg. - ''' - __tracebackhide__: bool = hide_tb +# ''' +# __tracebackhide__: bool = hide_tb +# try: +# return msg.pld +# except AttributeError as err: - try: - return msg.pld - # return msg['return'] - # except KeyError as ke: - except AttributeError as err: +# # internal error should never get here +# # assert msg.get('cid'), ( +# assert msg.cid, ( +# "Received internal error at portal?" +# ) - # internal error should never get here - # assert msg.get('cid'), ( - assert msg.cid, ( - "Received internal error at portal?" - ) - - raise unpack_error( - msg, - channel - ) from err +# raise unpack_error( +# msg, +# ctx.chan, +# ) from err class Portal: @@ -123,17 +125,21 @@ class Portal: # connected (peer) actors. cancel_timeout: float = 0.5 - def __init__(self, channel: Channel) -> None: + def __init__( + self, + channel: Channel, + ) -> None: + self.chan = channel # during the portal's lifetime - self._result_msg: dict|None = None + self._final_result: Any|None = None # When set to a ``Context`` (when _submit_for_result is called) # it is expected that ``result()`` will be awaited at some # point. - self._expect_result: Context | None = None + self._expect_result_ctx: Context|None = None self._streams: set[MsgStream] = set() - self.actor = current_actor() + self.actor: Actor = current_actor() @property def channel(self) -> Channel: @@ -147,6 +153,7 @@ class Portal: ) return self.chan + # TODO: factor this out into an `ActorNursery` wrapper async def _submit_for_result( self, ns: str, @@ -154,27 +161,18 @@ class Portal: **kwargs ) -> None: - assert self._expect_result is None, ( - "A pending main result has already been submitted" - ) + if self._expect_result_ctx is not None: + raise RuntimeError( + 'A pending main result has already been submitted' + ) - self._expect_result = await self.actor.start_remote_task( + self._expect_result_ctx = await self.actor.start_remote_task( self.channel, nsf=NamespacePath(f'{ns}:{func}'), kwargs=kwargs, portal=self, ) - async def _return_once( - self, - ctx: Context, - - ) -> Return: - - assert ctx._remote_func_type == 'asyncfunc' # single response - msg: Return = await ctx._recv_chan.receive() - return msg - async def result(self) -> Any: ''' Return the result(s) from the remote actor's "main" task. @@ -188,7 +186,7 @@ class Portal: raise exc # not expecting a "main" result - if self._expect_result is None: + if self._expect_result_ctx is None: log.warning( f"Portal for {self.channel.uid} not expecting a final" " result?\nresult() should only be called if subactor" @@ -196,17 +194,15 @@ class Portal: return NoResult # expecting a "main" result - assert self._expect_result + assert self._expect_result_ctx - if self._result_msg is None: - self._result_msg = await self._return_once( - self._expect_result + if self._final_result is None: + self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld( + ctx=self._expect_result_ctx, + expect_msg=Return, ) - return _unwrap_msg( - self._result_msg, - self.channel, - ) + return self._final_result async def _cancel_streams(self): # terminate all locally running async generator @@ -337,11 +333,9 @@ class Portal: kwargs=kwargs, portal=self, ) - ctx._portal: Portal = self - msg: Return = await self._return_once(ctx) - return _unwrap_msg( - msg, - self.channel, + return await ctx._pld_rx.recv_pld( + ctx=ctx, + expect_msg=Return, ) async def run( @@ -391,10 +385,9 @@ class Portal: kwargs=kwargs, portal=self, ) - ctx._portal = self - return _unwrap_msg( - await self._return_once(ctx), - self.channel, + return await ctx._pld_rx.recv_pld( + ctx=ctx, + expect_msg=Return, ) @acm @@ -436,7 +429,7 @@ class Portal: # deliver receive only stream async with MsgStream( ctx=ctx, - rx_chan=ctx._recv_chan, + rx_chan=ctx._rx_chan, ) as rchan: self._streams.add(rchan) yield rchan diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 72866d4..3e4066e 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -817,8 +817,8 @@ class Actor: state.max_buffer_size = msg_buffer_size except KeyError: - log.runtime( - f'Creating NEW IPC ctx for\n' + log.debug( + f'Allocate new IPC ctx for\n' f'peer: {chan.uid}\n' f'cid: {cid}\n' ) @@ -906,7 +906,7 @@ class Actor: # this should be immediate and does not (yet) wait for the # remote child task to sync via `Context.started()`. with trio.fail_after(ack_timeout): - first_msg: msgtypes.StartAck = await ctx._recv_chan.receive() + first_msg: msgtypes.StartAck = await ctx._rx_chan.receive() try: functype: str = first_msg.functype except AttributeError: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 16e32ce..764b7c1 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -35,7 +35,7 @@ import warnings import trio from ._exceptions import ( - _raise_from_no_key_in_msg, + # _raise_from_no_key_in_msg, ContextCancelled, ) from .log import get_logger @@ -44,8 +44,9 @@ from .trionics import ( BroadcastReceiver, ) from tractor.msg import ( - Return, - Stop, + # Return, + # Stop, + MsgType, Yield, ) @@ -94,24 +95,23 @@ class MsgStream(trio.abc.Channel): self._eoc: bool|trio.EndOfChannel = False self._closed: bool|trio.ClosedResourceError = False + # TODO: could we make this a direct method bind to `PldRx`? + # -> receive_nowait = PldRx.recv_pld + # |_ means latter would have to accept `MsgStream`-as-`self`? + # => should be fine as long as, + # -[ ] both define `._rx_chan` + # -[ ] .ctx is bound into `PldRx` using a `@cm`? + # # delegate directly to underlying mem channel def receive_nowait( self, - allow_msgs: list[str] = Yield, + expect_msg: MsgType = Yield, ): - msg: Yield|Stop = self._rx_chan.receive_nowait() - # TODO: replace msg equiv of this or does the `.pld` - # interface read already satisfy it? I think so, yes? - try: - return msg.pld - except AttributeError as attrerr: - _raise_from_no_key_in_msg( - ctx=self._ctx, - msg=msg, - src_err=attrerr, - log=log, - stream=self, - ) + ctx: Context = self._ctx + return ctx._pld_rx.recv_pld_nowait( + ctx=ctx, + expect_msg=expect_msg, + ) async def receive( self, @@ -146,24 +146,9 @@ class MsgStream(trio.abc.Channel): src_err: Exception|None = None # orig tb try: - try: - msg: Yield = await self._rx_chan.receive() - return msg.pld - # TODO: implement with match: instead? - except AttributeError as attrerr: - # src_err = kerr - src_err = attrerr - - # NOTE: may raise any of the below error types - # includg EoC when a 'stop' msg is found. - _raise_from_no_key_in_msg( - ctx=self._ctx, - msg=msg, - src_err=attrerr, - log=log, - stream=self, - ) + ctx: Context = self._ctx + return await ctx._pld_rx.recv_pld(ctx=ctx) # XXX: the stream terminates on either of: # - via `self._rx_chan.receive()` raising after manual closure @@ -228,7 +213,7 @@ class MsgStream(trio.abc.Channel): # probably want to instead raise the remote error # over the end-of-stream connection error since likely # the remote error was the source cause? - ctx: Context = self._ctx + # ctx: Context = self._ctx ctx.maybe_raise( raise_ctxc_from_self_call=True, ) @@ -292,7 +277,8 @@ class MsgStream(trio.abc.Channel): while not drained: try: maybe_final_msg = self.receive_nowait( - allow_msgs=[Yield, Return], + # allow_msgs=[Yield, Return], + expect_msg=Yield, ) if maybe_final_msg: log.debug( @@ -472,6 +458,9 @@ class MsgStream(trio.abc.Channel): self, # use memory channel size by default self._rx_chan._state.max_buffer_size, # type: ignore + + # TODO: can remove this kwarg right since + # by default behaviour is to do this anyway? receive_afunc=self.receive, ) @@ -517,19 +506,11 @@ class MsgStream(trio.abc.Channel): raise self._closed try: - # await self._ctx.chan.send( - # payload={ - # 'yield': data, - # 'cid': self._ctx.cid, - # }, - # # hide_tb=hide_tb, - # ) await self._ctx.chan.send( payload=Yield( cid=self._ctx.cid, pld=data, ), - # hide_tb=hide_tb, ) except ( trio.ClosedResourceError, @@ -562,7 +543,7 @@ def stream(func: Callable) -> Callable: ''' # TODO: apply whatever solution ``mypy`` ends up picking for this: # https://github.com/python/mypy/issues/2087#issuecomment-769266912 - func._tractor_stream_function = True # type: ignore + func._tractor_stream_function: bool = True # type: ignore sig = inspect.signature(func) params = sig.parameters diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py new file mode 100644 index 0000000..e78b79a --- /dev/null +++ b/tractor/msg/_ops.py @@ -0,0 +1,563 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Near-application abstractions for `MsgType.pld: PayloadT|Raw` +delivery, filtering and type checking as well as generic +operational helpers for processing transaction flows. + +''' +from __future__ import annotations +from contextlib import ( + # asynccontextmanager as acm, + contextmanager as cm, +) +from pprint import pformat +from typing import ( + Any, + Type, + TYPE_CHECKING, + # Union, +) +# ------ - ------ +from msgspec import ( + msgpack, + Raw, + Struct, + ValidationError, +) +import trio +# ------ - ------ +from tractor.log import get_logger +from tractor._exceptions import ( + MessagingError, + InternalError, + _raise_from_unexpected_msg, + MsgTypeError, + _mk_msg_type_err, + pack_from_raise, +) +from ._codec import ( + mk_dec, + MsgDec, +) +from .types import ( + CancelAck, + Error, + MsgType, + PayloadT, + Return, + Started, + Stop, + Yield, + # pretty_struct, +) + + +if TYPE_CHECKING: + from tractor._context import Context + from tractor._streaming import MsgStream + + +log = get_logger(__name__) + + +class PldRx(Struct): + ''' + A "msg payload receiver". + + The pairing of a "feeder" `trio.abc.ReceiveChannel` and an + interchange-specific (eg. msgpack) payload field decoder. The + validation/type-filtering rules are runtime mutable and allow + type constraining the set of `MsgType.pld: Raw|PayloadT` + values at runtime, per IPC task-context. + + This abstraction, being just below "user application code", + allows for the equivalent of our `MsgCodec` (used for + typer-filtering IPC dialog protocol msgs against a msg-spec) + but with granular control around payload delivery (i.e. the + data-values user code actually sees and uses (the blobs that + are "shuttled" by the wrapping dialog prot) such that invalid + `.pld: Raw` can be decoded and handled by IPC-primitive user + code (i.e. that operates on `Context` and `Msgstream` APIs) + without knowledge of the lower level `Channel`/`MsgTransport` + primitives nor the `MsgCodec` in use. Further, lazily decoding + payload blobs allows for topical (and maybe intentionally + "partial") encryption of msg field subsets. + + ''' + # TODO: better to bind it here? + # _rx_mc: trio.MemoryReceiveChannel + _msgdec: MsgDec = mk_dec(spec=Any) + + _ipc: Context|MsgStream|None = None + + @cm + def apply_to_ipc( + self, + ipc_prim: Context|MsgStream, + + ) -> PldRx: + ''' + Apply this payload receiver to an IPC primitive type, one + of `Context` or `MsgStream`. + + ''' + self._ipc = ipc_prim + try: + yield self + finally: + self._ipc = None + + @property + def dec(self) -> msgpack.Decoder: + return self._msgdec.dec + + def recv_pld_nowait( + self, + # TODO: make this `MsgStream` compat as well, see above^ + # ipc_prim: Context|MsgStream, + ctx: Context, + + ipc_msg: MsgType|None = None, + expect_msg: Type[MsgType]|None = None, + + **kwargs, + + ) -> Any|Raw: + + msg: MsgType = ( + ipc_msg + or + + # sync-rx msg from underlying IPC feeder (mem-)chan + ctx._rx_chan.receive_nowait() + ) + return self.dec_msg( + msg, + ctx=ctx, + expect_msg=expect_msg, + ) + + async def recv_pld( + self, + ctx: Context, + ipc_msg: MsgType|None = None, + expect_msg: Type[MsgType]|None = None, + + **kwargs + + ) -> Any|Raw: + ''' + Receive a `MsgType`, then decode and return its `.pld` field. + + ''' + msg: MsgType = ( + ipc_msg + or + + # async-rx msg from underlying IPC feeder (mem-)chan + await ctx._rx_chan.receive() + ) + return self.dec_msg( + msg, + ctx=ctx, + expect_msg=expect_msg, + ) + + def dec_msg( + self, + msg: MsgType, + ctx: Context, + expect_msg: Type[MsgType]|None = None, + + ) -> PayloadT|Raw: + ''' + Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and + return the value or raise an appropriate error. + + ''' + match msg: + # payload-data shuttle msg; deliver the `.pld` value + # directly to IPC (primitive) client-consumer code. + case ( + Started(pld=pld) # sync phase + |Yield(pld=pld) # streaming phase + |Return(pld=pld) # termination phase + ): + try: + pld: PayloadT = self._msgdec.decode(pld) + log.runtime( + 'Decode msg payload\n\n' + f'{msg}\n\n' + f'{pld}\n' + ) + return pld + + # XXX pld-type failure + except ValidationError as src_err: + msgterr: MsgTypeError = _mk_msg_type_err( + msg=msg, + codec=self._dec, + src_validation_error=src_err, + ) + msg: Error = pack_from_raise( + local_err=msgterr, + cid=msg.cid, + src_uid=ctx.chan.uid, + ) + + # XXX some other decoder specific failure? + # except TypeError as src_error: + # from .devx import mk_pdb + # mk_pdb().set_trace() + # raise src_error + + # a runtime-internal RPC endpoint response. + # always passthrough since (internal) runtime + # responses are generally never exposed to consumer + # code. + case CancelAck( + pld=bool(cancelled) + ): + return cancelled + + case Error(): + src_err = MessagingError( + 'IPC dialog termination by msg' + ) + + case _: + src_err = InternalError( + 'Unknown IPC msg ??\n\n' + f'{msg}\n' + ) + + # fallthrough and raise from `src_err` + _raise_from_unexpected_msg( + ctx=ctx, + msg=msg, + src_err=src_err, + log=log, + expect_msg=expect_msg, + hide_tb=False, + ) + + async def recv_msg_w_pld( + self, + ipc: Context|MsgStream, + + ) -> tuple[MsgType, PayloadT]: + ''' + Retrieve the next avail IPC msg, decode it's payload, and return + the pair of refs. + + ''' + msg: MsgType = await ipc._rx_chan.receive() + + # TODO: is there some way we can inject the decoded + # payload into an existing output buffer for the original + # msg instance? + pld: PayloadT = self.dec_msg( + msg, + ctx=ipc, + ) + return msg, pld + + +async def drain_to_final_msg( + ctx: Context, + + hide_tb: bool = True, + msg_limit: int = 6, + +) -> tuple[ + Return|None, + list[MsgType] +]: + ''' + Drain IPC msgs delivered to the underlying IPC primitive's + rx-mem-chan (eg. `Context._rx_chan`) from the runtime in + search for a final result or error. + + The motivation here is to ideally capture errors during ctxc + conditions where a canc-request/or local error is sent but the + local task also excepts and enters the + `Portal.open_context().__aexit__()` block wherein we prefer to + capture and raise any remote error or ctxc-ack as part of the + `ctx.result()` cleanup and teardown sequence. + + ''' + __tracebackhide__: bool = hide_tb + raise_overrun: bool = not ctx._allow_overruns + + # wait for a final context result by collecting (but + # basically ignoring) any bi-dir-stream msgs still in transit + # from the far end. + pre_result_drained: list[MsgType] = [] + return_msg: Return|None = None + while not ( + ctx.maybe_error + and not ctx._final_result_is_set() + ): + try: + # TODO: can remove? + # await trio.lowlevel.checkpoint() + + # NOTE: this REPL usage actually works here dawg! Bo + # from .devx._debug import pause + # await pause() + + # TODO: bad idea? + # -[ ] wrap final outcome channel wait in a scope so + # it can be cancelled out of band if needed? + # + # with trio.CancelScope() as res_cs: + # ctx._res_scope = res_cs + # msg: dict = await ctx._rx_chan.receive() + # if res_cs.cancelled_caught: + + # TODO: ensure there's no more hangs, debugging the + # runtime pretty preaase! + # from .devx._debug import pause + # await pause() + + # TODO: can remove this finally? + # we have no more need for the sync draining right + # since we're can kinda guarantee the async + # `.receive()` below will never block yah? + # + # if ( + # ctx._cancel_called and ( + # ctx.cancel_acked + # # or ctx.chan._cancel_called + # ) + # # or not ctx._final_result_is_set() + # # ctx.outcome is not + # # or ctx.chan._closed + # ): + # try: + # msg: dict = await ctx._rx_chan.receive_nowait()() + # except trio.WouldBlock: + # log.warning( + # 'When draining already `.cancel_called` ctx!\n' + # 'No final msg arrived..\n' + # ) + # break + # else: + # msg: dict = await ctx._rx_chan.receive() + + # TODO: don't need it right jefe? + # with trio.move_on_after(1) as cs: + # if cs.cancelled_caught: + # from .devx._debug import pause + # await pause() + + # pray to the `trio` gawds that we're corrent with this + # msg: dict = await ctx._rx_chan.receive() + msg, pld = await ctx._pld_rx.recv_msg_w_pld(ipc=ctx) + + # NOTE: we get here if the far end was + # `ContextCancelled` in 2 cases: + # 1. we requested the cancellation and thus + # SHOULD NOT raise that far end error, + # 2. WE DID NOT REQUEST that cancel and thus + # SHOULD RAISE HERE! + except trio.Cancelled: + + # CASE 2: mask the local cancelled-error(s) + # only when we are sure the remote error is + # the source cause of this local task's + # cancellation. + ctx.maybe_raise() + + # CASE 1: we DID request the cancel we simply + # continue to bubble up as normal. + raise + + match msg: + + # final result arrived! + case Return( + # cid=cid, + # pld=res, + ): + # ctx._result: Any = res + ctx._result: Any = pld + log.runtime( + 'Context delivered final draining msg:\n' + f'{pformat(msg)}' + ) + # XXX: only close the rx mem chan AFTER + # a final result is retreived. + # if ctx._rx_chan: + # await ctx._rx_chan.aclose() + # TODO: ^ we don't need it right? + return_msg = msg + break + + # far end task is still streaming to us so discard + # and report depending on local ctx state. + case Yield(): + pre_result_drained.append(msg) + if ( + (ctx._stream.closed + and (reason := 'stream was already closed') + ) + or (ctx.cancel_acked + and (reason := 'ctx cancelled other side') + ) + or (ctx._cancel_called + and (reason := 'ctx called `.cancel()`') + ) + or (len(pre_result_drained) > msg_limit + and (reason := f'"yield" limit={msg_limit}') + ) + ): + log.cancel( + 'Cancelling `MsgStream` drain since ' + f'{reason}\n\n' + f'<= {ctx.chan.uid}\n' + f' |_{ctx._nsf}()\n\n' + f'=> {ctx._task}\n' + f' |_{ctx._stream}\n\n' + + f'{pformat(msg)}\n' + ) + return ( + return_msg, + pre_result_drained, + ) + + # drain up to the `msg_limit` hoping to get + # a final result or error/ctxc. + else: + log.warning( + 'Ignoring "yield" msg during `ctx.result()` drain..\n' + f'<= {ctx.chan.uid}\n' + f' |_{ctx._nsf}()\n\n' + f'=> {ctx._task}\n' + f' |_{ctx._stream}\n\n' + + f'{pformat(msg)}\n' + ) + continue + + # stream terminated, but no result yet.. + # + # TODO: work out edge cases here where + # a stream is open but the task also calls + # this? + # -[ ] should be a runtime error if a stream is open right? + # Stop() + case Stop(): + pre_result_drained.append(msg) + log.cancel( + 'Remote stream terminated due to "stop" msg:\n\n' + f'{pformat(msg)}\n' + ) + continue + + # remote error msg, likely already handled inside + # `Context._deliver_msg()` + case Error(): + # TODO: can we replace this with `ctx.maybe_raise()`? + # -[ ] would this be handier for this case maybe? + # async with maybe_raise_on_exit() as raises: + # if raises: + # log.error('some msg about raising..') + # + re: Exception|None = ctx._remote_error + if re: + assert msg is ctx._cancel_msg + # NOTE: this solved a super duper edge case XD + # this was THE super duper edge case of: + # - local task opens a remote task, + # - requests remote cancellation of far end + # ctx/tasks, + # - needs to wait for the cancel ack msg + # (ctxc) or some result in the race case + # where the other side's task returns + # before the cancel request msg is ever + # rxed and processed, + # - here this surrounding drain loop (which + # iterates all ipc msgs until the ack or + # an early result arrives) was NOT exiting + # since we are the edge case: local task + # does not re-raise any ctxc it receives + # IFF **it** was the cancellation + # requester.. + # + # XXX will raise if necessary but ow break + # from loop presuming any supressed error + # (ctxc) should terminate the context! + ctx._maybe_raise_remote_err( + re, + # NOTE: obvi we don't care if we + # overran the far end if we're already + # waiting on a final result (msg). + # raise_overrun_from_self=False, + raise_overrun_from_self=raise_overrun, + ) + + break # OOOOOF, yeah obvi we need this.. + + # XXX we should never really get here + # right! since `._deliver_msg()` should + # always have detected an {'error': ..} + # msg and already called this right!?! + # elif error := unpack_error( + # msg=msg, + # chan=ctx._portal.channel, + # hide_tb=False, + # ): + # log.critical('SHOULD NEVER GET HERE!?') + # assert msg is ctx._cancel_msg + # assert error.msgdata == ctx._remote_error.msgdata + # assert error.ipc_msg == ctx._remote_error.ipc_msg + # from .devx._debug import pause + # await pause() + # ctx._maybe_cancel_and_set_remote_error(error) + # ctx._maybe_raise_remote_err(error) + + else: + # bubble the original src key error + raise + + # XXX should pretty much never get here unless someone + # overrides the default `MsgType` spec. + case _: + pre_result_drained.append(msg) + # It's definitely an internal error if any other + # msg type without a`'cid'` field arrives here! + if not msg.cid: + raise InternalError( + 'Unexpected cid-missing msg?\n\n' + f'{msg}\n' + ) + + raise RuntimeError('Unknown msg type: {msg}') + + else: + log.cancel( + 'Skipping `MsgStream` drain since final outcome is set\n\n' + f'{ctx.outcome}\n' + ) + + return ( + return_msg, + pre_result_drained, + )