WIP porting runtime to use `Msg`-spec

runtime_to_msgspec
Tyler Goodlet 2024-04-02 13:41:52 -04:00
parent f2ce4a3469
commit e153cc0187
10 changed files with 879 additions and 478 deletions

View File

@ -53,7 +53,14 @@ from ._exceptions import (
_raise_from_no_key_in_msg,
)
from .log import get_logger
from .msg import NamespacePath
from .msg import (
NamespacePath,
Msg,
Return,
Started,
Stop,
Yield,
)
from ._ipc import Channel
from ._streaming import MsgStream
from ._state import (
@ -96,7 +103,8 @@ async def _drain_to_final_msg(
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
pre_result_drained: list[dict] = []
# pre_result_drained: list[dict] = []
pre_result_drained: list[Msg] = []
while not (
ctx.maybe_error
and not ctx._final_result_is_set()
@ -155,7 +163,10 @@ async def _drain_to_final_msg(
# await pause()
# pray to the `trio` gawds that we're corrent with this
msg: dict = await ctx._recv_chan.receive()
# msg: dict = await ctx._recv_chan.receive()
msg: Msg = await ctx._recv_chan.receive()
# always capture unexpected/non-result msgs
pre_result_drained.append(msg)
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
@ -175,24 +186,31 @@ async def _drain_to_final_msg(
# continue to bubble up as normal.
raise
try:
ctx._result: Any = msg['return']
log.runtime(
'Context delivered final draining msg:\n'
f'{pformat(msg)}'
)
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._recv_chan:
# await ctx._recv_chan.aclose()
# TODO: ^ we don't need it right?
break
match msg:
case Return(
cid=cid,
pld=res,
):
# try:
# ctx._result: Any = msg['return']
# ctx._result: Any = msg.pld
ctx._result: Any = res
log.runtime(
'Context delivered final draining msg:\n'
f'{pformat(msg)}'
)
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._recv_chan:
# await ctx._recv_chan.aclose()
# TODO: ^ we don't need it right?
break
except KeyError:
# always capture unexpected/non-result msgs
pre_result_drained.append(msg)
# except KeyError:
# except AttributeError:
case Yield():
# if 'yield' in msg:
if 'yield' in msg:
# far end task is still streaming to us so discard
# and report per local context state.
if (
@ -238,9 +256,10 @@ async def _drain_to_final_msg(
# TODO: work out edge cases here where
# a stream is open but the task also calls
# this?
# -[ ] should be a runtime error if a stream is open
# right?
elif 'stop' in msg:
# -[ ] should be a runtime error if a stream is open right?
# Stop()
case Stop():
# elif 'stop' in msg:
log.cancel(
'Remote stream terminated due to "stop" msg:\n\n'
f'{pformat(msg)}\n'
@ -249,78 +268,80 @@ async def _drain_to_final_msg(
# It's an internal error if any other msg type without
# a`'cid'` field arrives here!
if not msg.get('cid'):
raise InternalError(
'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
)
case _:
# if not msg.get('cid'):
if not msg.cid:
raise InternalError(
'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
)
# XXX fallthrough to handle expected error XXX
# TODO: replace this with `ctx.maybe_raise()`
#
# TODO: would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
# XXX fallthrough to handle expected error XXX
# TODO: replace this with `ctx.maybe_raise()`
#
# TODO: would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
re: Exception|None = ctx._remote_error
if re:
log.critical(
'Remote ctx terminated due to "error" msg:\n'
f'{re}'
)
assert msg is ctx._cancel_msg
# NOTE: this solved a super dupe edge case XD
# this was THE super duper edge case of:
# - local task opens a remote task,
# - requests remote cancellation of far end
# ctx/tasks,
# - needs to wait for the cancel ack msg
# (ctxc) or some result in the race case
# where the other side's task returns
# before the cancel request msg is ever
# rxed and processed,
# - here this surrounding drain loop (which
# iterates all ipc msgs until the ack or
# an early result arrives) was NOT exiting
# since we are the edge case: local task
# does not re-raise any ctxc it receives
# IFF **it** was the cancellation
# requester..
# will raise if necessary, ow break from
# loop presuming any error terminates the
# context!
ctx._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun,
)
re: Exception|None = ctx._remote_error
if re:
log.critical(
'Remote ctx terminated due to "error" msg:\n'
f'{re}'
)
assert msg is ctx._cancel_msg
# NOTE: this solved a super dupe edge case XD
# this was THE super duper edge case of:
# - local task opens a remote task,
# - requests remote cancellation of far end
# ctx/tasks,
# - needs to wait for the cancel ack msg
# (ctxc) or some result in the race case
# where the other side's task returns
# before the cancel request msg is ever
# rxed and processed,
# - here this surrounding drain loop (which
# iterates all ipc msgs until the ack or
# an early result arrives) was NOT exiting
# since we are the edge case: local task
# does not re-raise any ctxc it receives
# IFF **it** was the cancellation
# requester..
# will raise if necessary, ow break from
# loop presuming any error terminates the
# context!
ctx._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun,
)
break # OOOOOF, yeah obvi we need this..
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
elif error := unpack_error(
msg=msg,
chan=ctx._portal.channel,
hide_tb=False,
):
log.critical('SHOULD NEVER GET HERE!?')
assert msg is ctx._cancel_msg
assert error.msgdata == ctx._remote_error.msgdata
from .devx._debug import pause
await pause()
ctx._maybe_cancel_and_set_remote_error(error)
ctx._maybe_raise_remote_err(error)
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
elif error := unpack_error(
msg=msg,
chan=ctx._portal.channel,
hide_tb=False,
):
log.critical('SHOULD NEVER GET HERE!?')
assert msg is ctx._cancel_msg
assert error.msgdata == ctx._remote_error.msgdata
from .devx._debug import pause
await pause()
ctx._maybe_cancel_and_set_remote_error(error)
ctx._maybe_raise_remote_err(error)
else:
# bubble the original src key error
raise
else:
# bubble the original src key error
raise
else:
log.cancel(
'Skipping `MsgStream` drain since final outcome is set\n\n'
@ -710,10 +731,14 @@ class Context:
async def send_stop(self) -> None:
# await pause()
await self.chan.send({
'stop': True,
'cid': self.cid
})
# await self.chan.send({
# # Stop(
# 'stop': True,
# 'cid': self.cid
# })
await self.chan.send(
Stop(cid=self.cid)
)
def _maybe_cancel_and_set_remote_error(
self,
@ -1395,17 +1420,19 @@ class Context:
for msg in drained_msgs:
# TODO: mask this by default..
if 'return' in msg:
# if 'return' in msg:
if isinstance(msg, Return):
# from .devx import pause
# await pause()
raise InternalError(
# raise InternalError(
log.warning(
'Final `return` msg should never be drained !?!?\n\n'
f'{msg}\n'
)
log.cancel(
'Ctx drained pre-result msgs:\n'
f'{drained_msgs}'
f'{pformat(drained_msgs)}'
)
self.maybe_raise(
@ -1613,7 +1640,18 @@ class Context:
f'called `.started()` twice on context with {self.chan.uid}'
)
await self.chan.send({'started': value, 'cid': self.cid})
# await self.chan.send(
# {
# 'started': value,
# 'cid': self.cid,
# }
# )
await self.chan.send(
Started(
cid=self.cid,
pld=value,
)
)
self._started_called = True
async def _drain_overflows(
@ -1668,7 +1706,8 @@ class Context:
async def _deliver_msg(
self,
msg: dict,
# msg: dict,
msg: Msg,
) -> bool:
'''
@ -1852,7 +1891,7 @@ class Context:
# anything different.
return False
else:
txt += f'\n{msg}\n'
# txt += f'\n{msg}\n'
# raise local overrun and immediately pack as IPC
# msg for far end.
try:
@ -1983,15 +2022,17 @@ async def open_context_from_portal(
)
assert ctx._remote_func_type == 'context'
msg: dict = await ctx._recv_chan.receive()
msg: Started = await ctx._recv_chan.receive()
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first: Any = msg['started']
# first: Any = msg['started']
first: Any = msg.pld
ctx._started_called: bool = True
except KeyError as src_error:
# except KeyError as src_error:
except AttributeError as src_error:
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,

View File

@ -135,6 +135,7 @@ def _trio_main(
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.cancel(
'Actor received KBI\n'

View File

@ -31,9 +31,16 @@ import textwrap
import traceback
import trio
from msgspec import structs
from tractor._state import current_actor
from tractor.log import get_logger
from tractor.msg import (
Error,
Msg,
Stop,
Yield,
)
if TYPE_CHECKING:
from ._context import Context
@ -135,6 +142,8 @@ class RemoteActorError(Exception):
# and instead render if from `.boxed_type_str`?
self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None
# TODO: make this a `.errmsg: Error` throughout?
self.msgdata: dict[str, Any] = msgdata
# TODO: mask out eventually or place in `pack_error()`
@ -464,7 +473,23 @@ class AsyncioCancelled(Exception):
'''
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
'''
IPC related msg (typing), transaction (ordering) or dialog
handling error.
'''
class MsgTypeError(MessagingError):
'''
Equivalent of a `TypeError` for an IPC wire-message
due to an invalid field value (type).
Normally this is re-raised from some `.msg._codec`
decode error raised by a backend interchange lib
like `msgspec` or `pycapnproto`.
'''
def pack_error(
@ -473,7 +498,7 @@ def pack_error(
tb: str|None = None,
cid: str|None = None,
) -> dict[str, dict]:
) -> Error|dict[str, dict]:
'''
Create an "error message" which boxes a locally caught
exception's meta-data and encodes it for wire transport via an
@ -536,17 +561,23 @@ def pack_error(
# content's `.msgdata`).
error_msg['tb_str'] = tb_str
pkt: dict = {
'error': error_msg,
}
if cid:
pkt['cid'] = cid
# Error()
# pkt: dict = {
# 'error': error_msg,
# }
pkt: Error = Error(
cid=cid,
**error_msg,
# TODO: just get rid of `.pld` on this msg?
)
# if cid:
# pkt['cid'] = cid
return pkt
def unpack_error(
msg: dict[str, Any],
msg: dict[str, Any]|Error,
chan: Channel|None = None,
box_type: RemoteActorError = RemoteActorError,
@ -564,15 +595,17 @@ def unpack_error(
'''
__tracebackhide__: bool = hide_tb
error_dict: dict[str, dict] | None
if (
error_dict := msg.get('error')
) is None:
error_dict: dict[str, dict]|None
if not isinstance(msg, Error):
# if (
# error_dict := msg.get('error')
# ) is None:
# no error field, nothing to unpack.
return None
# retrieve the remote error's msg encoded details
tb_str: str = error_dict.get('tb_str', '')
# tb_str: str = error_dict.get('tb_str', '')
tb_str: str = msg.tb_str
message: str = (
f'{chan.uid}\n'
+
@ -581,7 +614,8 @@ def unpack_error(
# try to lookup a suitable error type from the local runtime
# env then use it to construct a local instance.
boxed_type_str: str = error_dict['boxed_type_str']
# boxed_type_str: str = error_dict['boxed_type_str']
boxed_type_str: str = msg.boxed_type_str
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
if boxed_type_str == 'ContextCancelled':
@ -595,7 +629,11 @@ def unpack_error(
# original source error.
elif boxed_type_str == 'RemoteActorError':
assert boxed_type is RemoteActorError
assert len(error_dict['relay_path']) >= 1
# assert len(error_dict['relay_path']) >= 1
assert len(msg.relay_path) >= 1
# TODO: mk RAE just take the `Error` instance directly?
error_dict: dict = structs.asdict(msg)
exc = box_type(
message,
@ -623,11 +661,12 @@ def is_multi_cancelled(exc: BaseException) -> bool:
def _raise_from_no_key_in_msg(
ctx: Context,
msg: dict,
msg: Msg,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
expect_msg: str = Yield,
stream: MsgStream | None = None,
# allow "deeper" tbs when debugging B^o
@ -660,8 +699,10 @@ def _raise_from_no_key_in_msg(
# an internal error should never get here
try:
cid: str = msg['cid']
except KeyError as src_err:
cid: str = msg.cid
# cid: str = msg['cid']
# except KeyError as src_err:
except AttributeError as src_err:
raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n\n'
@ -672,7 +713,10 @@ def _raise_from_no_key_in_msg(
# TODO: test that shows stream raising an expected error!!!
# raise the error message in a boxed exception type!
if msg.get('error'):
# if msg.get('error'):
if isinstance(msg, Error):
# match msg:
# case Error():
raise unpack_error(
msg,
ctx.chan,
@ -683,8 +727,10 @@ def _raise_from_no_key_in_msg(
# `MsgStream` termination msg.
# TODO: does it make more sense to pack
# the stream._eoc outside this in the calleer always?
# case Stop():
elif (
msg.get('stop')
# msg.get('stop')
isinstance(msg, Stop)
or (
stream
and stream._eoc
@ -725,14 +771,16 @@ def _raise_from_no_key_in_msg(
stream
and stream._closed
):
raise trio.ClosedResourceError('This stream was closed')
# TODO: our own error subtype?
raise trio.ClosedResourceError(
'This stream was closed'
)
# always re-raise the source error if no translation error case
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f"{_type} was expecting a '{expect_key}' message"
f"{_type} was expecting a '{expect_key.upper()}' message"
" BUT received a non-error msg:\n"
f'{pformat(msg)}'
) from src_err

View File

@ -38,17 +38,23 @@ from typing import (
Protocol,
Type,
TypeVar,
Union,
)
import msgspec
from tricycle import BufferedReceiveStream
import trio
from tractor.log import get_logger
from tractor._exceptions import TransportClosed
from tractor._exceptions import (
TransportClosed,
MsgTypeError,
)
from tractor.msg import (
_ctxvar_MsgCodec,
_codec,
MsgCodec,
mk_codec,
types,
)
log = get_logger(__name__)
@ -163,7 +169,16 @@ class MsgpackTCPStream(MsgTransport):
# allow for custom IPC msg interchange format
# dynamic override Bo
self.codec: MsgCodec = codec or mk_codec()
self._task = trio.lowlevel.current_task()
self._codec: MsgCodec = (
codec
or
_codec._ctxvar_MsgCodec.get()
)
log.critical(
'!?!: USING STD `tractor` CODEC !?!?\n'
f'{self._codec}\n'
)
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
'''
@ -171,7 +186,6 @@ class MsgpackTCPStream(MsgTransport):
stream using the current task's `MsgCodec`.
'''
import msgspec # noqa
decodes_failed: int = 0
while True:
@ -206,7 +220,19 @@ class MsgpackTCPStream(MsgTransport):
try:
# NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`.
yield _ctxvar_MsgCodec.get().decode(msg_bytes)
codec: MsgCodec = _ctxvar_MsgCodec.get()
if self._codec.pld_spec != codec.pld_spec:
# assert (
# task := trio.lowlevel.current_task()
# ) is not self._task
# self._task = task
self._codec = codec
log.critical(
'.recv() USING NEW CODEC !?!?\n'
f'{self._codec}\n\n'
f'msg_bytes -> {msg_bytes}\n'
)
yield codec.decode(msg_bytes)
# TODO: remove, was only for orig draft impl
# testing.
@ -221,6 +247,41 @@ class MsgpackTCPStream(MsgTransport):
#
# yield obj
# XXX NOTE: since the below error derives from
# `DecodeError` we need to catch is specially
# and always raise such that spec violations
# are never allowed to be caught silently!
except msgspec.ValidationError as verr:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgspec.msgpack.decode(msg_bytes)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(types, msg_type_name)
errmsg: str = (
f'Received invalid IPC `{msg_type_name}` msg\n\n'
)
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = verr.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
if field_val := msg_dict.get(maybe_field):
field_type: Union[Type] = msg_type.__signature__.parameters[
maybe_field
].annotation
errmsg += (
f'{msg.rstrip("`")}\n\n'
f'{msg_type}\n'
f' |_.{maybe_field}: {field_type} = {field_val}\n'
)
raise MsgTypeError(errmsg) from verr
except (
msgspec.DecodeError,
UnicodeDecodeError,
@ -230,14 +291,15 @@ class MsgpackTCPStream(MsgTransport):
# do with a channel drop - hope that receiving from the
# channel will raise an expected error and bubble up.
try:
msg_str: str | bytes = msg_bytes.decode()
msg_str: str|bytes = msg_bytes.decode()
except UnicodeDecodeError:
msg_str = msg_bytes
log.error(
'`msgspec` failed to decode!?\n'
'dumping bytes:\n'
f'{msg_str!r}'
log.exception(
'Failed to decode msg?\n'
f'{codec}\n\n'
'Rxed bytes from wire:\n\n'
f'{msg_str!r}\n'
)
decodes_failed += 1
else:
@ -258,8 +320,21 @@ class MsgpackTCPStream(MsgTransport):
# NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`.
bytes_data: bytes = _ctxvar_MsgCodec.get().encode(msg)
# bytes_data: bytes = self.codec.encode(msg)
codec: MsgCodec = _ctxvar_MsgCodec.get()
# if self._codec != codec:
if self._codec.pld_spec != codec.pld_spec:
self._codec = codec
log.critical(
'.send() using NEW CODEC !?!?\n'
f'{self._codec}\n\n'
f'OBJ -> {msg}\n'
)
if type(msg) not in types.__spec__:
log.warning(
'Sending non-`Msg`-spec msg?\n\n'
f'{msg}\n'
)
bytes_data: bytes = codec.encode(msg)
# supposedly the fastest says,
# https://stackoverflow.com/a/54027962

View File

@ -45,7 +45,10 @@ from ._state import (
)
from ._ipc import Channel
from .log import get_logger
from .msg import NamespacePath
from .msg import (
NamespacePath,
Return,
)
from ._exceptions import (
unpack_error,
NoResult,
@ -66,7 +69,8 @@ log = get_logger(__name__)
# `._raise_from_no_key_in_msg()` (after tweak to
# accept a `chan: Channel` arg) in key block!
def _unwrap_msg(
msg: dict[str, Any],
# msg: dict[str, Any],
msg: Return,
channel: Channel,
hide_tb: bool = True,
@ -79,18 +83,21 @@ def _unwrap_msg(
__tracebackhide__: bool = hide_tb
try:
return msg['return']
except KeyError as ke:
return msg.pld
# return msg['return']
# except KeyError as ke:
except AttributeError as err:
# internal error should never get here
assert msg.get('cid'), (
# assert msg.get('cid'), (
assert msg.cid, (
"Received internal error at portal?"
)
raise unpack_error(
msg,
channel
) from ke
) from err
class Portal:

View File

@ -55,12 +55,21 @@ from ._exceptions import (
TransportClosed,
)
from .devx import (
# pause,
pause,
maybe_wait_for_debugger,
_debug,
)
from . import _state
from .log import get_logger
from tractor.msg.types import (
Start,
StartAck,
Started,
Stop,
Yield,
Return,
Error,
)
if TYPE_CHECKING:
@ -89,10 +98,13 @@ async def _invoke_non_context(
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
await chan.send({
'cid': cid,
'functype': 'asyncgen',
})
# await chan.send({
await chan.send(
StartAck(
cid=cid,
functype='asyncgen',
)
)
# XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be
@ -112,27 +124,45 @@ async def _invoke_non_context(
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({
'yield': item,
'cid': cid,
})
# await chan.send({
# # Yield()
# 'cid': cid,
# 'yield': item,
# })
await chan.send(
Yield(
cid=cid,
pld=item,
)
)
log.runtime(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({
'stop': True,
'cid': cid,
})
await chan.send(
Stop(cid=cid)
)
# await chan.send({
# # Stop(
# 'cid': cid,
# 'stop': True,
# })
# one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen:
await chan.send({
'cid': cid,
'functype': 'asyncgen',
})
await chan.send(
StartAck(
cid=cid,
functype='asyncgen',
)
)
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'asyncgen',
# })
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
@ -145,10 +175,14 @@ async def _invoke_non_context(
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({
'stop': True,
'cid': cid
})
await chan.send(
Stop(cid=cid)
)
# await chan.send({
# # Stop(
# 'cid': cid,
# 'stop': True,
# })
else:
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
@ -160,10 +194,17 @@ async def _invoke_non_context(
# way: using the linked IPC context machinery.
failed_resp: bool = False
try:
await chan.send({
'functype': 'asyncfunc',
'cid': cid
})
await chan.send(
StartAck(
cid=cid,
functype='asyncfunc',
)
)
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'asyncfunc',
# })
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
@ -197,10 +238,17 @@ async def _invoke_non_context(
and chan.connected()
):
try:
await chan.send({
'return': result,
'cid': cid,
})
# await chan.send({
# # Return()
# 'cid': cid,
# 'return': result,
# })
await chan.send(
Return(
cid=cid,
pld=result,
)
)
except (
BrokenPipeError,
trio.BrokenResourceError,
@ -381,6 +429,8 @@ async def _invoke(
# XXX for .pause_from_sync()` usage we need to make sure
# `greenback` is boostrapped in the subactor!
await _debug.maybe_init_greenback()
# else:
# await pause()
# TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)?
@ -493,10 +543,18 @@ async def _invoke(
# a "context" endpoint type is the most general and
# "least sugary" type of RPC ep with support for
# bi-dir streaming B)
await chan.send({
'cid': cid,
'functype': 'context',
})
# StartAck
await chan.send(
StartAck(
cid=cid,
functype='context',
)
)
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'context',
# })
# TODO: should we also use an `.open_context()` equiv
# for this callee side by factoring the impl from
@ -520,10 +578,17 @@ async def _invoke(
ctx._result = res
# deliver final result to caller side.
await chan.send({
'return': res,
'cid': cid
})
await chan.send(
Return(
cid=cid,
pld=res,
)
)
# await chan.send({
# # Return()
# 'cid': cid,
# 'return': res,
# })
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,
@ -696,7 +761,8 @@ async def try_ship_error_to_remote(
try:
# NOTE: normally only used for internal runtime errors
# so ship to peer actor without a cid.
msg: dict = pack_error(
# msg: dict = pack_error(
msg: Error = pack_error(
err,
cid=cid,
@ -712,12 +778,13 @@ async def try_ship_error_to_remote(
trio.BrokenResourceError,
BrokenPipeError,
):
err_msg: dict = msg['error']['tb_str']
# err_msg: dict = msg['error']['tb_str']
log.critical(
'IPC transport failure -> '
f'failed to ship error to {remote_descr}!\n\n'
f'X=> {channel.uid}\n\n'
f'{err_msg}\n'
# f'{err_msg}\n'
f'{msg}\n'
)
@ -777,31 +844,6 @@ async def process_messages(
with CancelScope(shield=shield) as loop_cs:
task_status.started(loop_cs)
async for msg in chan:
# dedicated loop terminate sentinel
if msg is None:
tasks: dict[
tuple[Channel, str],
tuple[Context, Callable, trio.Event]
] = actor._rpc_tasks.copy()
log.cancel(
f'Peer IPC channel terminated via `None` setinel msg?\n'
f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
for (channel, cid) in tasks:
if channel is chan:
await actor._cancel_task(
cid,
channel,
requesting_uid=channel.uid,
ipc_msg=msg,
)
break
log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n'
@ -811,216 +853,294 @@ async def process_messages(
f'{pformat(msg)}\n'
)
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter
# via its per-remote-context memory channel.
await actor._push_result(
chan,
cid,
msg,
)
match msg:
log.runtime(
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}:\n'
f'|_{chan}\n'
# if msg is None:
# dedicated loop terminate sentinel
case None:
# f'last msg: {msg}\n'
)
continue
# process a 'cmd' request-msg upack
# TODO: impl with native `msgspec.Struct` support !!
# -[ ] implement with ``match:`` syntax?
# -[ ] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
try:
(
ns,
funcname,
kwargs,
actorid,
cid,
) = msg['cmd']
except KeyError:
# This is the non-rpc error case, that is, an
# error **not** raised inside a call to ``_invoke()``
# (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers
# (normally portals) by marking the channel as errored
assert chan.uid
exc = unpack_error(msg, chan=chan)
chan._exc = exc
raise exc
log.runtime(
'Handling RPC cmd from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
if ns == 'self':
if funcname == 'cancel':
func: Callable = actor.cancel
kwargs |= {
'req_chan': chan,
}
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True):
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
log.runtime(
'Cancelling IPC transport msg-loop with peer:\n'
tasks: dict[
tuple[Channel, str],
tuple[Context, Callable, trio.Event]
] = actor._rpc_tasks.copy()
log.cancel(
f'Peer IPC channel terminated via `None` setinel msg?\n'
f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
loop_cs.cancel()
for (channel, cid) in tasks:
if channel is chan:
await actor._cancel_task(
cid,
channel,
requesting_uid=channel.uid,
ipc_msg=msg,
)
break
if funcname == '_cancel_task':
func: Callable = actor._cancel_task
# we immediately start the runtime machinery
# shutdown
# with CancelScope(shield=True):
target_cid: str = kwargs['cid']
kwargs |= {
# NOTE: ONLY the rpc-task-owning
# parent IPC channel should be able to
# cancel it!
'parent_chan': chan,
'requesting_uid': chan.uid,
'ipc_msg': msg,
}
# TODO: remove? already have emit in meth.
# log.runtime(
# f'Rx RPC task cancel request\n'
# f'<= canceller: {chan.uid}\n'
# f' |_{chan}\n\n'
# f'=> {actor}\n'
# f' |_cid: {target_cid}\n'
# )
try:
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
except BaseException:
log.exception(
'Failed to cancel task?\n'
f'<= canceller: {chan.uid}\n'
f' |_{chan}\n\n'
f'=> {actor}\n'
f' |_cid: {target_cid}\n'
)
continue
else:
# normally registry methods, eg.
# ``.register_actor()`` etc.
func: Callable = getattr(actor, funcname)
else:
# complain to client about restricted modules
try:
func = actor._get_rpc_func(ns, funcname)
except (
ModuleNotExposed,
AttributeError,
) as err:
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
)
await chan.send(err_msg)
continue
# schedule a task for the requested RPC function
# in the actor's main "service nursery".
# TODO: possibly a service-tn per IPC channel for
# supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks`
# table?
log.runtime(
f'Spawning task for RPC request\n'
f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n'
# TODO: maddr style repr?
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n'
f'=> {actor}\n'
f' |_cid: {cid}\n'
f' |>> {func}()\n'
)
assert actor._service_n # wait why? do it at top?
try:
ctx: Context = await actor._service_n.start(
partial(
_invoke,
actor,
cid,
# cid = msg.get('cid')
# if cid:
case (
StartAck(cid=cid)
| Started(cid=cid)
| Yield(cid=cid)
| Stop(cid=cid)
| Return(cid=cid)
| Error(cid=cid)
):
# deliver response to local caller/waiter
# via its per-remote-context memory channel.
await actor._push_result(
chan,
func,
kwargs,
),
name=funcname,
)
cid,
msg,
)
except (
RuntimeError,
BaseExceptionGroup,
):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task: bool = True
break
log.runtime(
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}:\n'
f'|_{chan}\n'
# in the lone case where a ``Context`` is not
# delivered, it's likely going to be a locally
# scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception):
log.warning(
'Task for RPC failed?'
f'|_ {func}()\n\n'
# f'last msg: {msg}\n'
)
continue
f'{err}'
)
continue
# process a 'cmd' request-msg upack
# TODO: impl with native `msgspec.Struct` support !!
# -[ ] implement with ``match:`` syntax?
# -[ ] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
case Start(
cid=cid,
ns=ns,
func=funcname,
kwargs=kwargs,
uid=actorid,
):
# try:
# (
# ns,
# funcname,
# kwargs,
# actorid,
# cid,
# ) = msg['cmd']
else:
# mark that we have ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event()
# # TODO: put in `case Error():` right?
# except KeyError:
# # This is the non-rpc error case, that is, an
# # error **not** raised inside a call to ``_invoke()``
# # (i.e. no cid was provided in the msg - see above).
# # Push this error to all local channel consumers
# # (normally portals) by marking the channel as errored
# assert chan.uid
# exc = unpack_error(msg, chan=chan)
# chan._exc = exc
# raise exc
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
actor._rpc_tasks[(chan, cid)] = (
ctx,
func,
trio.Event(),
)
log.runtime(
'Handling RPC `Start` request from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
# case Start(
# ns='self',
# funcname='cancel',
# ):
if ns == 'self':
if funcname == 'cancel':
func: Callable = actor.cancel
kwargs |= {
'req_chan': chan,
}
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True):
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
log.runtime(
'Cancelling IPC transport msg-loop with peer:\n'
f'|_{chan}\n'
)
loop_cs.cancel()
break
# case Start(
# ns='self',
# funcname='_cancel_task',
# ):
if funcname == '_cancel_task':
func: Callable = actor._cancel_task
# we immediately start the runtime machinery
# shutdown
# with CancelScope(shield=True):
target_cid: str = kwargs['cid']
kwargs |= {
# NOTE: ONLY the rpc-task-owning
# parent IPC channel should be able to
# cancel it!
'parent_chan': chan,
'requesting_uid': chan.uid,
'ipc_msg': msg,
}
# TODO: remove? already have emit in meth.
# log.runtime(
# f'Rx RPC task cancel request\n'
# f'<= canceller: {chan.uid}\n'
# f' |_{chan}\n\n'
# f'=> {actor}\n'
# f' |_cid: {target_cid}\n'
# )
try:
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
except BaseException:
log.exception(
'Failed to cancel task?\n'
f'<= canceller: {chan.uid}\n'
f' |_{chan}\n\n'
f'=> {actor}\n'
f' |_cid: {target_cid}\n'
)
continue
# case Start(
# ns='self',
# funcname='register_actor',
# ):
else:
# normally registry methods, eg.
# ``.register_actor()`` etc.
func: Callable = getattr(actor, funcname)
# case Start(
# ns=str(),
# funcname=funcname,
# ):
else:
# complain to client about restricted modules
try:
func = actor._get_rpc_func(ns, funcname)
except (
ModuleNotExposed,
AttributeError,
) as err:
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
)
await chan.send(err_msg)
continue
# schedule a task for the requested RPC function
# in the actor's main "service nursery".
# TODO: possibly a service-tn per IPC channel for
# supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks`
# table?
log.runtime(
f'Spawning task for RPC request\n'
f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n'
# TODO: maddr style repr?
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n'
f'=> {actor}\n'
f' |_cid: {cid}\n'
f' |>> {func}()\n'
)
assert actor._service_n # wait why? do it at top?
try:
ctx: Context = await actor._service_n.start(
partial(
_invoke,
actor,
cid,
chan,
func,
kwargs,
),
name=funcname,
)
except (
RuntimeError,
BaseExceptionGroup,
):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task: bool = True
break
# in the lone case where a ``Context`` is not
# delivered, it's likely going to be a locally
# scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception):
log.warning(
'Task for RPC failed?'
f'|_ {func}()\n\n'
f'{err}'
)
continue
else:
# mark that we have ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event()
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
actor._rpc_tasks[(chan, cid)] = (
ctx,
func,
trio.Event(),
)
case Error()|_:
# This is the non-rpc error case, that is, an
# error **not** raised inside a call to ``_invoke()``
# (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers
# (normally portals) by marking the channel as errored
log.exception(
f'Unhandled IPC msg:\n\n'
f'{msg}\n'
)
assert chan.uid
exc = unpack_error(
msg,
chan=chan,
)
chan._exc = exc
raise exc
log.runtime(
'Waiting on next IPC msg from\n'

View File

@ -91,6 +91,23 @@ from ._rpc import (
process_messages,
try_ship_error_to_remote,
)
from tractor.msg import (
types as msgtypes,
pretty_struct,
)
# from tractor.msg.types import (
# Aid,
# SpawnSpec,
# Start,
# StartAck,
# Started,
# Yield,
# Stop,
# Return,
# Error,
# )
if TYPE_CHECKING:
@ -147,6 +164,7 @@ class Actor:
# Information about `__main__` from parent
_parent_main_data: dict[str, str]
_parent_chan_cs: CancelScope|None = None
_spawn_spec: SpawnSpec|None = None
# syncs for setup/teardown sequences
_server_down: trio.Event|None = None
@ -537,7 +555,8 @@ class Actor:
f'{pformat(msg)}\n'
)
cid = msg.get('cid')
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
if cid:
# deliver response to local caller/waiter
await self._push_result(
@ -889,29 +908,44 @@ class Actor:
f'=> {ns}.{func}({kwargs})\n'
)
await chan.send(
{'cmd': (
ns,
func,
kwargs,
self.uid,
cid,
)}
msgtypes.Start(
ns=ns,
func=func,
kwargs=kwargs,
uid=self.uid,
cid=cid,
)
)
# {'cmd': (
# ns,
# func,
# kwargs,
# self.uid,
# cid,
# )}
# )
# Wait on first response msg and validate; this should be
# immediate.
first_msg: dict = await ctx._recv_chan.receive()
functype: str = first_msg.get('functype')
# first_msg: dict = await ctx._recv_chan.receive()
# functype: str = first_msg.get('functype')
if 'error' in first_msg:
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
try:
functype: str = first_msg.functype
except AttributeError:
raise unpack_error(first_msg, chan)
# if 'error' in first_msg:
# raise unpack_error(first_msg, chan)
elif functype not in (
if functype not in (
'asyncfunc',
'asyncgen',
'context',
):
raise ValueError(f"{first_msg} is an invalid response packet?")
raise ValueError(
f'{first_msg} is an invalid response packet?'
)
ctx._remote_func_type = functype
return ctx
@ -944,24 +978,36 @@ class Actor:
await self._do_handshake(chan)
accept_addrs: list[tuple[str, int]]|None = None
if self._spawn_method == "trio":
# Receive runtime state from our parent
parent_data: dict[str, Any]
parent_data = await chan.recv()
log.runtime(
'Received state from parent:\n\n'
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pformat(parent_data)}\n'
)
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars')
if self._spawn_method == "trio":
# Receive runtime state from our parent
# parent_data: dict[str, Any]
# parent_data = await chan.recv()
# TODO: maybe we should just wrap this directly
# in a `Actor.spawn_info: SpawnInfo` struct?
spawnspec: msgtypes.SpawnSpec = await chan.recv()
self._spawn_spec = spawnspec
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
log.runtime(
'Received runtime spec from parent:\n\n'
f'{pformat(spawnspec)}\n'
)
# accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
# rvs = parent_data.pop('_runtime_vars')
rvs = spawnspec._runtime_vars
if rvs['_debug_mode']:
try:
log.info('Enabling `stackscope` traces on SIGUSR1')
log.info(
'Enabling `stackscope` traces on SIGUSR1'
)
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
@ -969,28 +1015,40 @@ class Actor:
'`stackscope` not installed for use in debug mode!'
)
log.runtime(f"Runtime vars are: {rvs}")
log.runtime(f'Runtime vars are: {rvs}')
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
for attr, value in parent_data.items():
if (
attr == 'reg_addrs'
and value
):
# XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
# TODO: we don't really NEED these as
# tuples so we can probably drop this
# casting since apparently in python lists
# are "more efficient"?
self.reg_addrs = [tuple(val) for val in value]
# XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
#
self.reg_addrs = [
# TODO: we don't really NEED these as tuples?
# so we can probably drop this casting since
# apparently in python lists are "more
# efficient"?
tuple(val)
for val in spawnspec.reg_addrs
]
else:
setattr(self, attr, value)
# for attr, value in parent_data.items():
for _, attr, value in pretty_struct.iter_fields(
spawnspec,
):
setattr(self, attr, value)
# if (
# attr == 'reg_addrs'
# and value
# ):
# self.reg_addrs = [tuple(val) for val in value]
# else:
# setattr(self, attr, value)
return chan, accept_addrs
return (
chan,
accept_addrs,
)
except OSError: # failed to connect
log.warning(
@ -1432,7 +1490,7 @@ class Actor:
self,
chan: Channel
) -> tuple[str, str]:
) -> msgtypes.Aid:
'''
Exchange `(name, UUIDs)` identifiers as the first
communication step with any (peer) remote `Actor`.
@ -1441,14 +1499,27 @@ class Actor:
"actor model" parlance.
'''
await chan.send(self.uid)
value: tuple = await chan.recv()
uid: tuple[str, str] = (str(value[0]), str(value[1]))
name, uuid = self.uid
await chan.send(
msgtypes.Aid(
name=name,
uuid=uuid,
)
)
aid: msgtypes.Aid = await chan.recv()
chan.aid = aid
uid: tuple[str, str] = (
# str(value[0]),
# str(value[1])
aid.name,
aid.uuid,
)
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = str(uid[0]), str(uid[1])
chan.uid = uid
return uid
def is_infected_aio(self) -> bool:
@ -1508,7 +1579,8 @@ async def async_main(
# because we're running in mp mode
if (
set_accept_addr_says_rent
and set_accept_addr_says_rent is not None
and
set_accept_addr_says_rent is not None
):
accept_addrs = set_accept_addr_says_rent

View File

@ -49,6 +49,9 @@ from tractor._portal import Portal
from tractor._runtime import Actor
from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure
from tractor.msg.types import (
SpawnSpec,
)
if TYPE_CHECKING:
@ -489,14 +492,25 @@ async def trio_proc(
portal,
)
# send additional init params
await chan.send({
'_parent_main_data': subactor._parent_main_data,
'enable_modules': subactor.enable_modules,
'reg_addrs': subactor.reg_addrs,
'bind_addrs': bind_addrs,
'_runtime_vars': _runtime_vars,
})
# send a "spawning specification" which configures the
# initial runtime state of the child.
await chan.send(
SpawnSpec(
_parent_main_data=subactor._parent_main_data,
enable_modules=subactor.enable_modules,
reg_addrs=subactor.reg_addrs,
bind_addrs=bind_addrs,
_runtime_vars=_runtime_vars,
)
)
# await chan.send({
# '_parent_main_data': subactor._parent_main_data,
# 'enable_modules': subactor.enable_modules,
# 'reg_addrs': subactor.reg_addrs,
# 'bind_addrs': bind_addrs,
# '_runtime_vars': _runtime_vars,
# })
# track subactor in current nursery
curr_actor = current_actor()

View File

@ -43,6 +43,11 @@ from .trionics import (
broadcast_receiver,
BroadcastReceiver,
)
from tractor.msg import (
Stop,
Yield,
Error,
)
if TYPE_CHECKING:
from ._context import Context
@ -94,21 +99,25 @@ class MsgStream(trio.abc.Channel):
self,
allow_msg_keys: list[str] = ['yield'],
):
msg: dict = self._rx_chan.receive_nowait()
# msg: dict = self._rx_chan.receive_nowait()
msg: Yield|Stop = self._rx_chan.receive_nowait()
for (
i,
key,
) in enumerate(allow_msg_keys):
try:
return msg[key]
except KeyError as kerr:
# return msg[key]
return msg.pld
# except KeyError as kerr:
except AttributeError as attrerr:
if i < (len(allow_msg_keys) - 1):
continue
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
# src_err=kerr,
src_err=attrerr,
log=log,
expect_key=key,
stream=self,
@ -148,18 +157,22 @@ class MsgStream(trio.abc.Channel):
src_err: Exception|None = None # orig tb
try:
try:
msg = await self._rx_chan.receive()
return msg['yield']
msg: Yield = await self._rx_chan.receive()
# return msg['yield']
return msg.pld
except KeyError as kerr:
src_err = kerr
# except KeyError as kerr:
except AttributeError as attrerr:
# src_err = kerr
src_err = attrerr
# NOTE: may raise any of the below error types
# includg EoC when a 'stop' msg is found.
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
# src_err=kerr,
src_err=attrerr,
log=log,
expect_key='yield',
stream=self,
@ -514,11 +527,18 @@ class MsgStream(trio.abc.Channel):
raise self._closed
try:
# await self._ctx.chan.send(
# payload={
# 'yield': data,
# 'cid': self._ctx.cid,
# },
# # hide_tb=hide_tb,
# )
await self._ctx.chan.send(
payload={
'yield': data,
'cid': self._ctx.cid,
},
payload=Yield(
cid=self._ctx.cid,
pld=data,
),
# hide_tb=hide_tb,
)
except (

View File

@ -935,6 +935,9 @@ async def _pause(
# ``breakpoint()`` was awaited and begin handling stdio.
log.debug('Entering sync world of the `pdb` REPL..')
try:
# log.critical(
# f'stack len: {len(pdb.stack)}\n'
# )
debug_func(
actor,
pdb,