Compare commits
15 Commits
b209990d04
...
9b4f580470
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 9b4f580470 | |
Tyler Goodlet | d51be2a36a | |
Tyler Goodlet | 3018187228 | |
Tyler Goodlet | e5f0b450cf | |
Tyler Goodlet | 4aa24f8518 | |
Tyler Goodlet | d2f6428e46 | |
Tyler Goodlet | 5439060cd3 | |
Tyler Goodlet | 7372404d76 | |
Tyler Goodlet | 77a15ebf19 | |
Tyler Goodlet | d0e7610073 | |
Tyler Goodlet | a73b24cf4a | |
Tyler Goodlet | 5dfff3f75a | |
Tyler Goodlet | d4155396bf | |
Tyler Goodlet | 3869e91b19 | |
Tyler Goodlet | 829dfa7520 |
|
@ -14,19 +14,20 @@ from typing import (
|
|||
from contextvars import (
|
||||
Context,
|
||||
)
|
||||
# from inspect import Parameter
|
||||
|
||||
from msgspec import (
|
||||
structs,
|
||||
msgpack,
|
||||
# defstruct,
|
||||
Struct,
|
||||
ValidationError,
|
||||
)
|
||||
import pytest
|
||||
|
||||
import tractor
|
||||
from tractor import _state
|
||||
from tractor import (
|
||||
_state,
|
||||
MsgTypeError,
|
||||
)
|
||||
from tractor.msg import (
|
||||
_codec,
|
||||
_ctxvar_MsgCodec,
|
||||
|
@ -47,21 +48,6 @@ from tractor.msg.types import (
|
|||
import trio
|
||||
|
||||
|
||||
def test_msg_spec_xor_pld_spec():
|
||||
'''
|
||||
If the `.msg.types.Msg`-set is overridden, we
|
||||
can't also support a `Msg.pld` spec.
|
||||
|
||||
'''
|
||||
# apply custom hooks and set a `Decoder` which only
|
||||
# loads `NamespacePath` types.
|
||||
with pytest.raises(RuntimeError):
|
||||
mk_codec(
|
||||
ipc_msg_spec=Any,
|
||||
ipc_pld_spec=NamespacePath,
|
||||
)
|
||||
|
||||
|
||||
def mk_custom_codec(
|
||||
pld_spec: Union[Type]|Any,
|
||||
add_hooks: bool,
|
||||
|
@ -134,7 +120,9 @@ def mk_custom_codec(
|
|||
f'{uid}\n'
|
||||
'FAILED DECODE\n'
|
||||
f'type-> {obj_type}\n'
|
||||
f'obj-arg-> `{obj}`: {type(obj)}\n'
|
||||
f'obj-arg-> `{obj}`: {type(obj)}\n\n'
|
||||
f'current codec:\n'
|
||||
f'{current_codec()}\n'
|
||||
)
|
||||
# TODO: figure out the ignore subsys for this!
|
||||
# -[ ] option whether to defense-relay backc the msg
|
||||
|
@ -409,7 +397,9 @@ async def send_back_values(
|
|||
pld_spec=ipc_pld_spec,
|
||||
add_hooks=add_hooks,
|
||||
)
|
||||
with apply_codec(nsp_codec) as codec:
|
||||
with (
|
||||
apply_codec(nsp_codec) as codec,
|
||||
):
|
||||
chk_codec_applied(
|
||||
expect_codec=nsp_codec,
|
||||
enter_value=codec,
|
||||
|
@ -459,7 +449,7 @@ async def send_back_values(
|
|||
# XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL
|
||||
# `str` handling! or special debug mode IPC
|
||||
# msgs!
|
||||
# await tractor.pause()
|
||||
await tractor.pause()
|
||||
|
||||
raise RuntimeError(
|
||||
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
||||
|
@ -470,7 +460,8 @@ async def send_back_values(
|
|||
break # move on to streaming block..
|
||||
|
||||
except tractor.MsgTypeError:
|
||||
# await tractor.pause()
|
||||
await tractor.pause()
|
||||
|
||||
if expect_send:
|
||||
raise RuntimeError(
|
||||
f'EXPECTED to `.started()` value given spec:\n'
|
||||
|
@ -652,12 +643,42 @@ def test_codec_hooks_mod(
|
|||
|
||||
pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
|
||||
|
||||
# XXX should raise an mte (`MsgTypeError`)
|
||||
# when `add_codec_hooks == False` bc the input
|
||||
# `expect_ipc_send` kwarg has a nsp which can't be
|
||||
# serialized!
|
||||
#
|
||||
# TODO:can we ensure this happens from the
|
||||
# `Return`-side (aka the sub) as well?
|
||||
if not add_codec_hooks:
|
||||
try:
|
||||
async with p.open_context(
|
||||
send_back_values,
|
||||
expect_debug=debug_mode,
|
||||
pld_spec_type_strs=pld_spec_type_strs,
|
||||
add_hooks=add_codec_hooks,
|
||||
started_msg_bytes=nsp_codec.encode(expected_started),
|
||||
|
||||
# XXX NOTE bc we send a `NamespacePath` in this kwarg
|
||||
expect_ipc_send=expect_ipc_send,
|
||||
|
||||
) as (ctx, first):
|
||||
pytest.fail('ctx should fail to open without custom enc_hook!?')
|
||||
|
||||
# this test passes bc we can go no further!
|
||||
except MsgTypeError:
|
||||
# teardown nursery
|
||||
await p.cancel_actor()
|
||||
return
|
||||
|
||||
# TODO: send the original nsp here and
|
||||
# test with `limit_msg_spec()` above?
|
||||
# await tractor.pause()
|
||||
print('PARENT opening IPC ctx!\n')
|
||||
async with (
|
||||
|
||||
# XXX should raise an mte (`MsgTypeError`)
|
||||
# when `add_codec_hooks == False`..
|
||||
p.open_context(
|
||||
send_back_values,
|
||||
expect_debug=debug_mode,
|
||||
|
|
|
@ -26,6 +26,7 @@ disjoint, parallel executing tasks in separate actors.
|
|||
from __future__ import annotations
|
||||
from collections import deque
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from contextvars import ContextVar
|
||||
from dataclasses import (
|
||||
dataclass,
|
||||
field,
|
||||
|
@ -56,6 +57,7 @@ from ._exceptions import (
|
|||
)
|
||||
from .log import get_logger
|
||||
from .msg import (
|
||||
_codec,
|
||||
Error,
|
||||
MsgType,
|
||||
MsgCodec,
|
||||
|
@ -80,6 +82,9 @@ if TYPE_CHECKING:
|
|||
from ._portal import Portal
|
||||
from ._runtime import Actor
|
||||
from ._ipc import MsgTransport
|
||||
from .devx._code import (
|
||||
CallerInfo,
|
||||
)
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -499,6 +504,18 @@ class Context:
|
|||
_started_called: bool = False
|
||||
_stream_opened: bool = False
|
||||
_stream: MsgStream|None = None
|
||||
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
|
||||
'pld_codec',
|
||||
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
|
||||
)
|
||||
|
||||
@property
|
||||
def pld_codec(self) -> MsgCodec|None:
|
||||
return self._pld_codec_var.get()
|
||||
|
||||
# caller of `Portal.open_context()` for
|
||||
# logging purposes mostly
|
||||
_caller_info: CallerInfo|None = None
|
||||
|
||||
# overrun handling machinery
|
||||
# NOTE: none of this provides "backpressure" to the remote
|
||||
|
@ -525,6 +542,7 @@ class Context:
|
|||
|
||||
# TODO: figure out how we can enforce this without losing our minds..
|
||||
_strict_started: bool = False
|
||||
_cancel_on_msgerr: bool = True
|
||||
|
||||
def __str__(self) -> str:
|
||||
ds: str = '='
|
||||
|
@ -857,6 +875,7 @@ class Context:
|
|||
# TODO: never do this right?
|
||||
# if self._remote_error:
|
||||
# return
|
||||
peer_side: str = self.peer_side(self.side)
|
||||
|
||||
# XXX: denote and set the remote side's error so that
|
||||
# after we cancel whatever task is the opener of this
|
||||
|
@ -864,14 +883,15 @@ class Context:
|
|||
# appropriately.
|
||||
log.runtime(
|
||||
'Setting remote error for ctx\n\n'
|
||||
f'<= remote ctx uid: {self.chan.uid}\n'
|
||||
f'=>{error}'
|
||||
f'<= {peer_side!r}: {self.chan.uid}\n'
|
||||
f'=> {self.side!r}\n\n'
|
||||
f'{error}'
|
||||
)
|
||||
self._remote_error: BaseException = error
|
||||
|
||||
# self-cancel (ack) or,
|
||||
# peer propagated remote cancellation.
|
||||
msgtyperr: bool = False
|
||||
msgerr: bool = False
|
||||
if isinstance(error, ContextCancelled):
|
||||
|
||||
whom: str = (
|
||||
|
@ -884,7 +904,7 @@ class Context:
|
|||
)
|
||||
|
||||
elif isinstance(error, MsgTypeError):
|
||||
msgtyperr = True
|
||||
msgerr = True
|
||||
peer_side: str = self.peer_side(self.side)
|
||||
log.error(
|
||||
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
|
||||
|
@ -935,13 +955,24 @@ class Context:
|
|||
and not self._is_self_cancelled()
|
||||
and not cs.cancel_called
|
||||
and not cs.cancelled_caught
|
||||
and not msgtyperr
|
||||
and (
|
||||
msgerr
|
||||
and
|
||||
# NOTE: allow user to config not cancelling the
|
||||
# local scope on `MsgTypeError`s
|
||||
self._cancel_on_msgerr
|
||||
)
|
||||
):
|
||||
# TODO: it'd sure be handy to inject our own
|
||||
# `trio.Cancelled` subtype here ;)
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
log.cancel('Cancelling local `.open_context()` scope!')
|
||||
self._scope.cancel()
|
||||
|
||||
else:
|
||||
log.cancel('NOT cancelling local `.open_context()` scope!')
|
||||
|
||||
|
||||
# TODO: maybe we should also call `._res_scope.cancel()` if it
|
||||
# exists to support cancelling any drain loop hangs?
|
||||
|
||||
|
@ -966,9 +997,7 @@ class Context:
|
|||
dmaddr = dst_maddr
|
||||
|
||||
@property
|
||||
def repr_rpc(
|
||||
self,
|
||||
) -> str:
|
||||
def repr_rpc(self) -> str:
|
||||
# TODO: how to show the transport interchange fmt?
|
||||
# codec: str = self.chan.transport.codec_key
|
||||
outcome_str: str = self.repr_outcome(
|
||||
|
@ -980,6 +1009,27 @@ class Context:
|
|||
f'{self._nsf}() -> {outcome_str}:'
|
||||
)
|
||||
|
||||
@property
|
||||
def repr_caller(self) -> str:
|
||||
ci: CallerInfo|None = self._caller_info
|
||||
if ci:
|
||||
return (
|
||||
f'{ci.caller_nsp}()'
|
||||
# f'|_api: {ci.api_nsp}'
|
||||
)
|
||||
|
||||
return '<UNKNOWN caller-frame>'
|
||||
|
||||
@property
|
||||
def repr_api(self) -> str:
|
||||
# ci: CallerInfo|None = self._caller_info
|
||||
# if ci:
|
||||
# return (
|
||||
# f'{ci.api_nsp}()\n'
|
||||
# )
|
||||
|
||||
return 'Portal.open_context()'
|
||||
|
||||
async def cancel(
|
||||
self,
|
||||
timeout: float = 0.616,
|
||||
|
@ -1184,8 +1234,9 @@ class Context:
|
|||
)
|
||||
|
||||
# NOTE: in one way streaming this only happens on the
|
||||
# caller side inside `Actor.start_remote_task()` so if you try
|
||||
# to send a stop from the caller to the callee in the
|
||||
# parent-ctx-task side (on the side that calls
|
||||
# `Actor.start_remote_task()`) so if you try to send
|
||||
# a stop from the caller to the callee in the
|
||||
# single-direction-stream case you'll get a lookup error
|
||||
# currently.
|
||||
ctx: Context = actor.get_context(
|
||||
|
@ -1850,6 +1901,19 @@ class Context:
|
|||
send_chan: trio.MemorySendChannel = self._send_chan
|
||||
nsf: NamespacePath = self._nsf
|
||||
|
||||
side: str = self.side
|
||||
if side == 'child':
|
||||
assert not self._portal
|
||||
peer_side: str = self.peer_side(side)
|
||||
|
||||
flow_body: str = (
|
||||
f'<= peer {peer_side!r}: {from_uid}\n'
|
||||
f' |_<{nsf}()>\n\n'
|
||||
|
||||
f'=> {side!r}: {self._task}\n'
|
||||
f' |_<{self.repr_api} @ {self.repr_caller}>\n\n'
|
||||
)
|
||||
|
||||
re: Exception|None
|
||||
if re := unpack_error(
|
||||
msg,
|
||||
|
@ -1860,18 +1924,10 @@ class Context:
|
|||
else:
|
||||
log_meth = log.runtime
|
||||
|
||||
side: str = self.side
|
||||
|
||||
peer_side: str = self.peer_side(side)
|
||||
|
||||
log_meth(
|
||||
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
|
||||
|
||||
f'<= peer {peer_side!r}: {from_uid}\n'
|
||||
f' |_ {nsf}()\n\n'
|
||||
|
||||
f'=> {side!r} cid: {cid}\n'
|
||||
f' |_{self._task}\n\n'
|
||||
f'{flow_body}'
|
||||
|
||||
f'{pformat(re)}\n'
|
||||
)
|
||||
|
@ -1884,30 +1940,27 @@ class Context:
|
|||
# or `RemoteActorError`).
|
||||
self._maybe_cancel_and_set_remote_error(re)
|
||||
|
||||
# XXX only case where returning early is fine!
|
||||
# TODO: expose as mod func instead!
|
||||
structfmt = pretty_struct.Struct.pformat
|
||||
if self._in_overrun:
|
||||
log.warning(
|
||||
f'Queueing OVERRUN msg on caller task:\n'
|
||||
f'<= peer: {from_uid}\n'
|
||||
f' |_ {nsf}()\n\n'
|
||||
f'Queueing OVERRUN msg on caller task:\n\n'
|
||||
|
||||
f'=> cid: {cid}\n'
|
||||
f' |_{self._task}\n\n'
|
||||
f'{flow_body}'
|
||||
|
||||
f'{structfmt(msg)}\n'
|
||||
)
|
||||
self._overflow_q.append(msg)
|
||||
|
||||
# XXX NOTE XXX
|
||||
# overrun is the ONLY case where returning early is fine!
|
||||
return False
|
||||
|
||||
try:
|
||||
log.runtime(
|
||||
f'Delivering msg from IPC ctx:\n'
|
||||
f'<= {from_uid}\n'
|
||||
f' |_ {nsf}()\n\n'
|
||||
f'Delivering msg from IPC ctx:\n\n'
|
||||
|
||||
f'=> {self._task}\n'
|
||||
f' |_cid={self.cid}\n\n'
|
||||
f'{flow_body}'
|
||||
|
||||
f'{structfmt(msg)}\n'
|
||||
)
|
||||
|
@ -1939,6 +1992,7 @@ class Context:
|
|||
f'cid: {self.cid}\n'
|
||||
'Failed to deliver msg:\n'
|
||||
f'send_chan: {send_chan}\n\n'
|
||||
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
return False
|
||||
|
@ -2092,6 +2146,12 @@ async def open_context_from_portal(
|
|||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# denote this frame as a "runtime frame" for stack
|
||||
# introspection where we report the caller code in logging
|
||||
# and error message content.
|
||||
# NOTE: 2 bc of the wrapping `@acm`
|
||||
__runtimeframe__: int = 2 # noqa
|
||||
|
||||
# conduct target func method structural checks
|
||||
if not inspect.iscoroutinefunction(func) and (
|
||||
getattr(func, '_tractor_contex_function', False)
|
||||
|
@ -2119,6 +2179,8 @@ async def open_context_from_portal(
|
|||
nsf=nsf,
|
||||
kwargs=kwargs,
|
||||
|
||||
portal=portal,
|
||||
|
||||
# NOTE: it's imporant to expose this since you might
|
||||
# get the case where the parent who opened the context does
|
||||
# not open a stream until after some slow startup/init
|
||||
|
@ -2129,13 +2191,17 @@ async def open_context_from_portal(
|
|||
# place..
|
||||
allow_overruns=allow_overruns,
|
||||
)
|
||||
# ASAP, so that `Context.side: str` can be determined for
|
||||
# logging / tracing / debug!
|
||||
ctx._portal: Portal = portal
|
||||
|
||||
assert ctx._remote_func_type == 'context'
|
||||
msg: Started = await ctx._recv_chan.receive()
|
||||
assert ctx._caller_info
|
||||
|
||||
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
||||
# `Started`-msg any cancellation triggered
|
||||
# in `._maybe_cancel_and_set_remote_error()` will
|
||||
# NOT actually cancel the below line!
|
||||
# -> it's expected that if there is an error in this phase of
|
||||
# the dialog, the `Error` msg should be raised from the `msg`
|
||||
# handling block below.
|
||||
msg: Started = await ctx._recv_chan.receive()
|
||||
try:
|
||||
# the "first" value here is delivered by the callee's
|
||||
# ``Context.started()`` call.
|
||||
|
@ -2145,6 +2211,7 @@ async def open_context_from_portal(
|
|||
|
||||
# except KeyError as src_error:
|
||||
except AttributeError as src_error:
|
||||
log.exception('Raising from unexpected msg!\n')
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=ctx,
|
||||
msg=msg,
|
||||
|
@ -2570,7 +2637,6 @@ async def open_context_from_portal(
|
|||
None,
|
||||
)
|
||||
|
||||
|
||||
def mk_context(
|
||||
chan: Channel,
|
||||
cid: str,
|
||||
|
@ -2592,6 +2658,10 @@ def mk_context(
|
|||
recv_chan: trio.MemoryReceiveChannel
|
||||
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
||||
|
||||
# TODO: only scan caller-info if log level so high!
|
||||
from .devx._code import find_caller_info
|
||||
caller_info: CallerInfo|None = find_caller_info()
|
||||
|
||||
ctx = Context(
|
||||
chan=chan,
|
||||
cid=cid,
|
||||
|
@ -2600,6 +2670,7 @@ def mk_context(
|
|||
_recv_chan=recv_chan,
|
||||
_nsf=nsf,
|
||||
_task=trio.lowlevel.current_task(),
|
||||
_caller_info=caller_info,
|
||||
**kwargs,
|
||||
)
|
||||
# TODO: we can drop the old placeholder yah?
|
||||
|
@ -2610,7 +2681,11 @@ def mk_context(
|
|||
|
||||
def context(func: Callable) -> Callable:
|
||||
'''
|
||||
Mark an async function as a streaming routine with ``@context``.
|
||||
Mark an (async) function as an SC-supervised, inter-`Actor`,
|
||||
child-`trio.Task`, IPC endpoint otherwise known more
|
||||
colloquially as a (RPC) "context".
|
||||
|
||||
Functions annotated the fundamental IPC endpoint type offered by `tractor`.
|
||||
|
||||
'''
|
||||
# TODO: apply whatever solution ``mypy`` ends up picking for this:
|
||||
|
|
|
@ -935,7 +935,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
|
|||
def _raise_from_no_key_in_msg(
|
||||
ctx: Context,
|
||||
msg: MsgType,
|
||||
src_err: KeyError,
|
||||
src_err: AttributeError,
|
||||
log: StackLevelAdapter, # caller specific `log` obj
|
||||
|
||||
expect_msg: str = Yield,
|
||||
|
@ -994,7 +994,7 @@ def _raise_from_no_key_in_msg(
|
|||
ctx.chan,
|
||||
hide_tb=hide_tb,
|
||||
|
||||
) from None
|
||||
) from src_err
|
||||
|
||||
# `MsgStream` termination msg.
|
||||
# TODO: does it make more sense to pack
|
||||
|
|
|
@ -314,8 +314,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
|
||||
while True:
|
||||
try:
|
||||
header = await self.recv_stream.receive_exactly(4)
|
||||
|
||||
header: bytes = await self.recv_stream.receive_exactly(4)
|
||||
except (
|
||||
ValueError,
|
||||
ConnectionResetError,
|
||||
|
@ -337,8 +336,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
size, = struct.unpack("<I", header)
|
||||
|
||||
log.transport(f'received header {size}') # type: ignore
|
||||
|
||||
msg_bytes = await self.recv_stream.receive_exactly(size)
|
||||
msg_bytes: bytes = await self.recv_stream.receive_exactly(size)
|
||||
|
||||
log.transport(f"received {msg_bytes}") # type: ignore
|
||||
try:
|
||||
|
|
|
@ -161,17 +161,18 @@ class Portal:
|
|||
self._expect_result = await self.actor.start_remote_task(
|
||||
self.channel,
|
||||
nsf=NamespacePath(f'{ns}:{func}'),
|
||||
kwargs=kwargs
|
||||
kwargs=kwargs,
|
||||
portal=self,
|
||||
)
|
||||
|
||||
async def _return_once(
|
||||
self,
|
||||
ctx: Context,
|
||||
|
||||
) -> dict[str, Any]:
|
||||
) -> Return:
|
||||
|
||||
assert ctx._remote_func_type == 'asyncfunc' # single response
|
||||
msg: dict = await ctx._recv_chan.receive()
|
||||
msg: Return = await ctx._recv_chan.receive()
|
||||
return msg
|
||||
|
||||
async def result(self) -> Any:
|
||||
|
@ -247,6 +248,8 @@ class Portal:
|
|||
purpose.
|
||||
|
||||
'''
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
|
||||
chan: Channel = self.channel
|
||||
if not chan.connected():
|
||||
log.runtime(
|
||||
|
@ -324,16 +327,18 @@ class Portal:
|
|||
internals!
|
||||
|
||||
'''
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
nsf = NamespacePath(
|
||||
f'{namespace_path}:{function_name}'
|
||||
)
|
||||
ctx = await self.actor.start_remote_task(
|
||||
ctx: Context = await self.actor.start_remote_task(
|
||||
chan=self.channel,
|
||||
nsf=nsf,
|
||||
kwargs=kwargs,
|
||||
portal=self,
|
||||
)
|
||||
ctx._portal = self
|
||||
msg = await self._return_once(ctx)
|
||||
ctx._portal: Portal = self
|
||||
msg: Return = await self._return_once(ctx)
|
||||
return _unwrap_msg(
|
||||
msg,
|
||||
self.channel,
|
||||
|
@ -384,6 +389,7 @@ class Portal:
|
|||
self.channel,
|
||||
nsf=nsf,
|
||||
kwargs=kwargs,
|
||||
portal=self,
|
||||
)
|
||||
ctx._portal = self
|
||||
return _unwrap_msg(
|
||||
|
@ -398,6 +404,14 @@ class Portal:
|
|||
**kwargs,
|
||||
|
||||
) -> AsyncGenerator[MsgStream, None]:
|
||||
'''
|
||||
Legacy one-way streaming API.
|
||||
|
||||
TODO: re-impl on top `Portal.open_context()` + an async gen
|
||||
around `Context.open_stream()`.
|
||||
|
||||
'''
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
|
||||
if not inspect.isasyncgenfunction(async_gen_func):
|
||||
if not (
|
||||
|
@ -411,6 +425,7 @@ class Portal:
|
|||
self.channel,
|
||||
nsf=NamespacePath.from_ref(async_gen_func),
|
||||
kwargs=kwargs,
|
||||
portal=self,
|
||||
)
|
||||
ctx._portal = self
|
||||
|
||||
|
|
|
@ -135,7 +135,7 @@ async def open_root_actor(
|
|||
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
# on our debugger lock state.
|
||||
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
|
||||
# mark top most level process as root actor
|
||||
_state._runtime_vars['_is_root'] = True
|
||||
|
|
|
@ -814,7 +814,7 @@ async def process_messages(
|
|||
# should use it?
|
||||
# https://github.com/python-trio/trio/issues/467
|
||||
log.runtime(
|
||||
'Entering IPC msg loop:\n'
|
||||
'Entering RPC msg loop:\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
|
@ -876,7 +876,7 @@ async def process_messages(
|
|||
# XXX NOTE XXX don't start entire actor
|
||||
# runtime cancellation if this actor is
|
||||
# currently in debug mode!
|
||||
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
|
||||
pdb_complete: trio.Event|None = _debug.DebugStatus.repl_release
|
||||
if pdb_complete:
|
||||
await pdb_complete.wait()
|
||||
|
||||
|
@ -1073,7 +1073,7 @@ async def process_messages(
|
|||
log.exception(message)
|
||||
raise RuntimeError(message)
|
||||
|
||||
log.runtime(
|
||||
log.transport(
|
||||
'Waiting on next IPC msg from\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
|
|
|
@ -267,10 +267,13 @@ class Actor:
|
|||
self._listeners: list[trio.abc.Listener] = []
|
||||
self._parent_chan: Channel|None = None
|
||||
self._forkserver_info: tuple|None = None
|
||||
|
||||
# track each child/sub-actor in it's locally
|
||||
# supervising nursery
|
||||
self._actoruid2nursery: dict[
|
||||
tuple[str, str],
|
||||
tuple[str, str], # sub-`Actor.uid`
|
||||
ActorNursery|None,
|
||||
] = {} # type: ignore # noqa
|
||||
] = {}
|
||||
|
||||
# when provided, init the registry addresses property from
|
||||
# input via the validator.
|
||||
|
@ -659,12 +662,18 @@ class Actor:
|
|||
|
||||
# TODO: NEEEDS TO BE TESTED!
|
||||
# actually, no idea if this ever even enters.. XD
|
||||
#
|
||||
# XXX => YES IT DOES, when i was testing ctl-c
|
||||
# from broken debug TTY locking due to
|
||||
# msg-spec races on application using RunVar...
|
||||
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
||||
if (
|
||||
pdb_user_uid
|
||||
and local_nursery
|
||||
):
|
||||
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
|
||||
entry: tuple|None = local_nursery._children.get(
|
||||
tuple(pdb_user_uid)
|
||||
)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
@ -674,10 +683,10 @@ class Actor:
|
|||
and poll() is None
|
||||
):
|
||||
log.cancel(
|
||||
'Root actor reports no-more-peers, BUT '
|
||||
'Root actor reports no-more-peers, BUT\n'
|
||||
'a DISCONNECTED child still has the debug '
|
||||
'lock!\n'
|
||||
f'root uid: {self.uid}\n'
|
||||
'lock!\n\n'
|
||||
# f'root uid: {self.uid}\n'
|
||||
f'last disconnected child uid: {uid}\n'
|
||||
f'locking child uid: {pdb_user_uid}\n'
|
||||
)
|
||||
|
@ -703,9 +712,8 @@ class Actor:
|
|||
# if a now stale local task has the TTY lock still
|
||||
# we cancel it to allow servicing other requests for
|
||||
# the lock.
|
||||
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
|
||||
if (
|
||||
db_cs
|
||||
(db_cs := pdb_lock.get_locking_task_cs())
|
||||
and not db_cs.cancel_called
|
||||
and uid == pdb_user_uid
|
||||
):
|
||||
|
@ -742,7 +750,7 @@ class Actor:
|
|||
except KeyError:
|
||||
log.warning(
|
||||
'Ignoring invalid IPC ctx msg!\n\n'
|
||||
f'<= sender: {uid}\n'
|
||||
f'<= sender: {uid}\n\n'
|
||||
# XXX don't need right since it's always in msg?
|
||||
# f'=> cid: {cid}\n\n'
|
||||
|
||||
|
@ -796,7 +804,7 @@ class Actor:
|
|||
cid,
|
||||
# side,
|
||||
)]
|
||||
log.runtime(
|
||||
log.debug(
|
||||
f'Retreived cached IPC ctx for\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'cid:{cid}\n'
|
||||
|
@ -835,10 +843,14 @@ class Actor:
|
|||
nsf: NamespacePath,
|
||||
kwargs: dict,
|
||||
|
||||
# determines `Context.side: str`
|
||||
portal: Portal|None = None,
|
||||
|
||||
# IPC channel config
|
||||
msg_buffer_size: int|None = None,
|
||||
allow_overruns: bool = False,
|
||||
load_nsf: bool = False,
|
||||
ack_timeout: float = 3,
|
||||
|
||||
) -> Context:
|
||||
'''
|
||||
|
@ -863,10 +875,12 @@ class Actor:
|
|||
msg_buffer_size=msg_buffer_size,
|
||||
allow_overruns=allow_overruns,
|
||||
)
|
||||
ctx._portal = portal
|
||||
|
||||
if (
|
||||
'self' in nsf
|
||||
or not load_nsf
|
||||
or
|
||||
not load_nsf
|
||||
):
|
||||
ns, _, func = nsf.partition(':')
|
||||
else:
|
||||
|
@ -874,42 +888,29 @@ class Actor:
|
|||
# -[ ] but, how to do `self:<Actor.meth>`??
|
||||
ns, func = nsf.to_tuple()
|
||||
|
||||
log.runtime(
|
||||
'Sending cmd to\n'
|
||||
f'peer: {chan.uid} => \n'
|
||||
'\n'
|
||||
f'=> {ns}.{func}({kwargs})\n'
|
||||
)
|
||||
await chan.send(
|
||||
msgtypes.Start(
|
||||
msg = msgtypes.Start(
|
||||
ns=ns,
|
||||
func=func,
|
||||
kwargs=kwargs,
|
||||
uid=self.uid,
|
||||
cid=cid,
|
||||
)
|
||||
log.runtime(
|
||||
'Sending RPC start msg\n\n'
|
||||
f'=> peer: {chan.uid}\n'
|
||||
f' |_ {ns}.{func}({kwargs})\n'
|
||||
)
|
||||
# {'cmd': (
|
||||
# ns,
|
||||
# func,
|
||||
# kwargs,
|
||||
# self.uid,
|
||||
# cid,
|
||||
# )}
|
||||
# )
|
||||
|
||||
# Wait on first response msg and validate; this should be
|
||||
# immediate.
|
||||
# first_msg: dict = await ctx._recv_chan.receive()
|
||||
# functype: str = first_msg.get('functype')
|
||||
await chan.send(msg)
|
||||
|
||||
# NOTE wait on first `StartAck` response msg and validate;
|
||||
# this should be immediate and does not (yet) wait for the
|
||||
# remote child task to sync via `Context.started()`.
|
||||
with trio.fail_after(ack_timeout):
|
||||
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
||||
try:
|
||||
functype: str = first_msg.functype
|
||||
except AttributeError:
|
||||
raise unpack_error(first_msg, chan)
|
||||
# if 'error' in first_msg:
|
||||
# raise unpack_error(first_msg, chan)
|
||||
|
||||
if functype not in (
|
||||
'asyncfunc',
|
||||
|
@ -917,7 +918,7 @@ class Actor:
|
|||
'context',
|
||||
):
|
||||
raise ValueError(
|
||||
f'{first_msg} is an invalid response packet?'
|
||||
f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
|
||||
)
|
||||
|
||||
ctx._remote_func_type = functype
|
||||
|
@ -1162,7 +1163,7 @@ class Actor:
|
|||
|
||||
# kill any debugger request task to avoid deadlock
|
||||
# with the root actor in this tree
|
||||
dbcs = _debug.Lock._debugger_request_cs
|
||||
dbcs = _debug.DebugStatus.req_cs
|
||||
if dbcs is not None:
|
||||
msg += (
|
||||
'>> Cancelling active debugger request..\n'
|
||||
|
@ -1237,9 +1238,9 @@ class Actor:
|
|||
except KeyError:
|
||||
# NOTE: during msging race conditions this will often
|
||||
# emit, some examples:
|
||||
# - callee returns a result before cancel-msg/ctxc-raised
|
||||
# - callee self raises ctxc before caller send request,
|
||||
# - callee errors prior to cancel req.
|
||||
# - child returns a result before cancel-msg/ctxc-raised
|
||||
# - child self raises ctxc before parent send request,
|
||||
# - child errors prior to cancel req.
|
||||
log.cancel(
|
||||
'Cancel request invalid, RPC task already completed?\n\n'
|
||||
f'<= canceller: {requesting_uid}\n\n'
|
||||
|
@ -1305,12 +1306,12 @@ class Actor:
|
|||
f'|_{ctx}\n'
|
||||
)
|
||||
log.runtime(
|
||||
'Waiting on RPC task to cancel\n'
|
||||
'Waiting on RPC task to cancel\n\n'
|
||||
f'{flow_info}'
|
||||
)
|
||||
await is_complete.wait()
|
||||
log.runtime(
|
||||
f'Sucessfully cancelled RPC task\n'
|
||||
f'Sucessfully cancelled RPC task\n\n'
|
||||
f'{flow_info}'
|
||||
)
|
||||
return True
|
||||
|
@ -1536,8 +1537,8 @@ async def async_main(
|
|||
|
||||
'''
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
# on our debugger lock state.
|
||||
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
# on our debugger state.
|
||||
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
|
||||
is_registered: bool = False
|
||||
try:
|
||||
|
|
|
@ -30,11 +30,16 @@ if TYPE_CHECKING:
|
|||
|
||||
_current_actor: Actor|None = None # type: ignore # noqa
|
||||
_last_actor_terminated: Actor|None = None
|
||||
|
||||
# TODO: mk this a `msgspec.Struct`!
|
||||
_runtime_vars: dict[str, Any] = {
|
||||
'_debug_mode': False,
|
||||
'_is_root': False,
|
||||
'_root_mailbox': (None, None),
|
||||
'_registry_addrs': [],
|
||||
|
||||
# for `breakpoint()` support
|
||||
'use_greenback': False,
|
||||
}
|
||||
|
||||
|
||||
|
@ -61,7 +66,7 @@ def current_actor(
|
|||
err_on_no_runtime
|
||||
and _current_actor is None
|
||||
):
|
||||
msg: str = 'No local actor has been initialized yet'
|
||||
msg: str = 'No local actor has been initialized yet?\n'
|
||||
from ._exceptions import NoRuntime
|
||||
|
||||
if last := last_actor():
|
||||
|
@ -74,8 +79,8 @@ def current_actor(
|
|||
# this process.
|
||||
else:
|
||||
msg += (
|
||||
'No last actor found?\n'
|
||||
'Did you forget to open one of:\n\n'
|
||||
# 'No last actor found?\n'
|
||||
'\nDid you forget to call one of,\n'
|
||||
'- `tractor.open_root_actor()`\n'
|
||||
'- `tractor.open_nursery()`\n'
|
||||
)
|
||||
|
|
|
@ -377,14 +377,17 @@ class MsgStream(trio.abc.Channel):
|
|||
# await rx_chan.aclose()
|
||||
|
||||
if not self._eoc:
|
||||
log.cancel(
|
||||
'Stream closed by self before it received an EoC?\n'
|
||||
'Setting eoc manually..\n..'
|
||||
)
|
||||
self._eoc: bool = trio.EndOfChannel(
|
||||
f'Context stream closed by self({self._ctx.side})\n'
|
||||
message: str = (
|
||||
f'Context stream closed by {self._ctx.side!r}\n'
|
||||
f'|_{self}\n'
|
||||
)
|
||||
log.cancel(
|
||||
'Stream self-closed before receiving EoC\n\n'
|
||||
+
|
||||
message
|
||||
)
|
||||
self._eoc = trio.EndOfChannel(message)
|
||||
|
||||
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
||||
# => NO, DEFINITELY NOT! <=
|
||||
# if we're a bi-dir ``MsgStream`` BECAUSE this same
|
||||
|
|
|
@ -131,7 +131,12 @@ class ActorNursery:
|
|||
"main task" besides the runtime.
|
||||
|
||||
'''
|
||||
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
loglevel: str = (
|
||||
loglevel
|
||||
or self._actor.loglevel
|
||||
or get_loglevel()
|
||||
)
|
||||
|
||||
# configure and pass runtime state
|
||||
_rtv = _state._runtime_vars.copy()
|
||||
|
@ -209,6 +214,7 @@ class ActorNursery:
|
|||
the actor is terminated.
|
||||
|
||||
'''
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
mod_path: str = fn.__module__
|
||||
|
||||
if name is None:
|
||||
|
@ -257,6 +263,7 @@ class ActorNursery:
|
|||
directly without any far end graceful ``trio`` cancellation.
|
||||
|
||||
'''
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
self.cancelled = True
|
||||
|
||||
# TODO: impl a repr for spawn more compact
|
||||
|
|
|
@ -27,7 +27,6 @@ from ._debug import (
|
|||
pause as pause,
|
||||
pause_from_sync as pause_from_sync,
|
||||
shield_sigint_handler as shield_sigint_handler,
|
||||
MultiActorPdb as MultiActorPdb,
|
||||
open_crash_handler as open_crash_handler,
|
||||
maybe_open_crash_handler as maybe_open_crash_handler,
|
||||
post_mortem as post_mortem,
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Tools for code-object annotation, introspection and mutation
|
||||
as it pertains to improving the grok-ability of our runtime!
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
import inspect
|
||||
# import msgspec
|
||||
# from pprint import pformat
|
||||
from types import (
|
||||
FrameType,
|
||||
FunctionType,
|
||||
MethodType,
|
||||
# CodeType,
|
||||
)
|
||||
from typing import (
|
||||
# Any,
|
||||
Callable,
|
||||
# TYPE_CHECKING,
|
||||
Type,
|
||||
)
|
||||
|
||||
from tractor.msg import (
|
||||
pretty_struct,
|
||||
NamespacePath,
|
||||
)
|
||||
|
||||
|
||||
# TODO: yeah, i don't love this and we should prolly just
|
||||
# write a decorator that actually keeps a stupid ref to the func
|
||||
# obj..
|
||||
def get_class_from_frame(fr: FrameType) -> (
|
||||
FunctionType
|
||||
|MethodType
|
||||
):
|
||||
'''
|
||||
Attempt to get the function (or method) reference
|
||||
from a given `FrameType`.
|
||||
|
||||
Verbatim from an SO:
|
||||
https://stackoverflow.com/a/2220759
|
||||
|
||||
'''
|
||||
args, _, _, value_dict = inspect.getargvalues(fr)
|
||||
|
||||
# we check the first parameter for the frame function is
|
||||
# named 'self'
|
||||
if (
|
||||
len(args)
|
||||
and
|
||||
# TODO: other cases for `@classmethod` etc..?)
|
||||
args[0] == 'self'
|
||||
):
|
||||
# in that case, 'self' will be referenced in value_dict
|
||||
instance: object = value_dict.get('self')
|
||||
if instance:
|
||||
# return its class
|
||||
return getattr(
|
||||
instance,
|
||||
'__class__',
|
||||
None,
|
||||
)
|
||||
|
||||
# return None otherwise
|
||||
return None
|
||||
|
||||
|
||||
def func_ref_from_frame(
|
||||
frame: FrameType,
|
||||
) -> Callable:
|
||||
func_name: str = frame.f_code.co_name
|
||||
try:
|
||||
return frame.f_globals[func_name]
|
||||
except KeyError:
|
||||
cls: Type|None = get_class_from_frame(frame)
|
||||
if cls:
|
||||
return getattr(
|
||||
cls,
|
||||
func_name,
|
||||
)
|
||||
|
||||
|
||||
# TODO: move all this into new `.devx._code`!
|
||||
# -[ ] prolly create a `@runtime_api` dec?
|
||||
# -[ ] ^- make it capture and/or accept buncha optional
|
||||
# meta-data like a fancier version of `@pdbp.hideframe`.
|
||||
#
|
||||
class CallerInfo(pretty_struct.Struct):
|
||||
rt_fi: inspect.FrameInfo
|
||||
call_frame: FrameType
|
||||
|
||||
@property
|
||||
def api_func_ref(self) -> Callable|None:
|
||||
return func_ref_from_frame(self.rt_fi.frame)
|
||||
|
||||
@property
|
||||
def api_nsp(self) -> NamespacePath|None:
|
||||
func: FunctionType = self.api_func_ref
|
||||
if func:
|
||||
return NamespacePath.from_ref(func)
|
||||
|
||||
return '<unknown>'
|
||||
|
||||
@property
|
||||
def caller_func_ref(self) -> Callable|None:
|
||||
return func_ref_from_frame(self.call_frame)
|
||||
|
||||
@property
|
||||
def caller_nsp(self) -> NamespacePath|None:
|
||||
func: FunctionType = self.caller_func_ref
|
||||
if func:
|
||||
return NamespacePath.from_ref(func)
|
||||
|
||||
return '<unknown>'
|
||||
|
||||
|
||||
def find_caller_info(
|
||||
dunder_var: str = '__runtimeframe__',
|
||||
iframes:int = 1,
|
||||
check_frame_depth: bool = True,
|
||||
|
||||
) -> CallerInfo|None:
|
||||
'''
|
||||
Scan up the callstack for a frame with a `dunder_var: str` variable
|
||||
and return the `iframes` frames above it.
|
||||
|
||||
By default we scan for a `__runtimeframe__` scope var which
|
||||
denotes a `tractor` API above which (one frame up) is "user
|
||||
app code" which "called into" the `tractor` method or func.
|
||||
|
||||
TODO: ex with `Portal.open_context()`
|
||||
|
||||
'''
|
||||
# TODO: use this instead?
|
||||
# https://docs.python.org/3/library/inspect.html#inspect.getouterframes
|
||||
frames: list[inspect.FrameInfo] = inspect.stack()
|
||||
for fi in frames:
|
||||
assert (
|
||||
fi.function
|
||||
==
|
||||
fi.frame.f_code.co_name
|
||||
)
|
||||
this_frame: FrameType = fi.frame
|
||||
dunder_val: int|None = this_frame.f_locals.get(dunder_var)
|
||||
if dunder_val:
|
||||
go_up_iframes: int = (
|
||||
dunder_val # could be 0 or `True` i guess?
|
||||
or
|
||||
iframes
|
||||
)
|
||||
rt_frame: FrameType = fi.frame
|
||||
call_frame = rt_frame
|
||||
for i in range(go_up_iframes):
|
||||
call_frame = call_frame.f_back
|
||||
|
||||
return CallerInfo(
|
||||
rt_fi=fi,
|
||||
call_frame=call_frame,
|
||||
)
|
||||
|
||||
return None
|
File diff suppressed because it is too large
Load Diff
|
@ -23,12 +23,31 @@ into each ``trio.Nursery`` except it links the lifetimes of memory space
|
|||
disjoint, parallel executing tasks in separate actors.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
import multiprocessing as mp
|
||||
from signal import (
|
||||
signal,
|
||||
SIGUSR1,
|
||||
)
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import trio
|
||||
from tractor import (
|
||||
_state,
|
||||
log as logmod,
|
||||
)
|
||||
|
||||
log = logmod.get_logger(__name__)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._spawn import ProcessType
|
||||
from tractor import (
|
||||
Actor,
|
||||
ActorNursery,
|
||||
)
|
||||
|
||||
|
||||
@trio.lowlevel.disable_ki_protection
|
||||
def dump_task_tree() -> None:
|
||||
|
@ -41,9 +60,15 @@ def dump_task_tree() -> None:
|
|||
recurse_child_tasks=True
|
||||
)
|
||||
)
|
||||
log = get_console_log('cancel')
|
||||
log = get_console_log(
|
||||
name=__name__,
|
||||
level='cancel',
|
||||
)
|
||||
actor: Actor = _state.current_actor()
|
||||
log.pdb(
|
||||
f'Dumping `stackscope` tree:\n\n'
|
||||
f'Dumping `stackscope` tree for actor\n'
|
||||
f'{actor.name}: {actor}\n'
|
||||
f' |_{mp.current_process()}\n\n'
|
||||
f'{tree_str}\n'
|
||||
)
|
||||
# import logging
|
||||
|
@ -56,8 +81,13 @@ def dump_task_tree() -> None:
|
|||
# ).exception("Error printing task tree")
|
||||
|
||||
|
||||
def signal_handler(sig: int, frame: object) -> None:
|
||||
import traceback
|
||||
def signal_handler(
|
||||
sig: int,
|
||||
frame: object,
|
||||
|
||||
relay_to_subs: bool = True,
|
||||
|
||||
) -> None:
|
||||
try:
|
||||
trio.lowlevel.current_trio_token(
|
||||
).run_sync_soon(dump_task_tree)
|
||||
|
@ -65,6 +95,26 @@ def signal_handler(sig: int, frame: object) -> None:
|
|||
# not in async context -- print a normal traceback
|
||||
traceback.print_stack()
|
||||
|
||||
if not relay_to_subs:
|
||||
return
|
||||
|
||||
an: ActorNursery
|
||||
for an in _state.current_actor()._actoruid2nursery.values():
|
||||
|
||||
subproc: ProcessType
|
||||
subactor: Actor
|
||||
for subactor, subproc, _ in an._children.values():
|
||||
log.pdb(
|
||||
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
|
||||
f'{subactor}\n'
|
||||
f' |_{subproc}\n'
|
||||
)
|
||||
|
||||
if isinstance(subproc, trio.Process):
|
||||
subproc.send_signal(sig)
|
||||
|
||||
elif isinstance(subproc, mp.Process):
|
||||
subproc._send_signal(sig)
|
||||
|
||||
|
||||
def enable_stack_on_sig(
|
||||
|
@ -82,3 +132,6 @@ def enable_stack_on_sig(
|
|||
# NOTE: not the above can be triggered from
|
||||
# a (xonsh) shell using:
|
||||
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
||||
#
|
||||
# for example if you were looking to trace a `pytest` run
|
||||
# kill -SIGUSR1 @$(pgrep -f 'pytest')
|
||||
|
|
|
@ -33,25 +33,29 @@ from __future__ import annotations
|
|||
from contextlib import (
|
||||
contextmanager as cm,
|
||||
)
|
||||
# from contextvars import (
|
||||
# ContextVar,
|
||||
# Token,
|
||||
# )
|
||||
from contextvars import (
|
||||
ContextVar,
|
||||
Token,
|
||||
)
|
||||
import textwrap
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
Union,
|
||||
)
|
||||
from types import ModuleType
|
||||
|
||||
import msgspec
|
||||
from msgspec import msgpack
|
||||
from trio.lowlevel import (
|
||||
RunVar,
|
||||
RunVarToken,
|
||||
from msgspec import (
|
||||
msgpack,
|
||||
Raw,
|
||||
)
|
||||
# from trio.lowlevel import (
|
||||
# RunVar,
|
||||
# RunVarToken,
|
||||
# )
|
||||
# TODO: see notes below from @mikenerone..
|
||||
# from tricycle import TreeVar
|
||||
|
||||
|
@ -62,6 +66,9 @@ from tractor.msg.types import (
|
|||
)
|
||||
from tractor.log import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._context import Context
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
||||
|
@ -157,24 +164,6 @@ class MsgCodec(Struct):
|
|||
|
||||
lib: ModuleType = msgspec
|
||||
|
||||
# TODO: a sub-decoder system as well?
|
||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||
# see related comments in `.msg.types`
|
||||
# _payload_decs: (
|
||||
# dict[
|
||||
# str,
|
||||
# msgpack.Decoder,
|
||||
# ]
|
||||
# |None
|
||||
# ) = None
|
||||
# OR
|
||||
# ) = {
|
||||
# # pre-seed decoders for std-py-type-set for use when
|
||||
# # `MsgType.pld == None|Any`.
|
||||
# None: msgpack.Decoder(Any),
|
||||
# Any: msgpack.Decoder(Any),
|
||||
# }
|
||||
|
||||
# TODO: use `functools.cached_property` for these ?
|
||||
# https://docs.python.org/3/library/functools.html#functools.cached_property
|
||||
@property
|
||||
|
@ -210,7 +199,25 @@ class MsgCodec(Struct):
|
|||
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
||||
return self._dec.decode(msg)
|
||||
|
||||
# TODO: do we still want to try and support the sub-decoder with
|
||||
# TODO: a sub-decoder system as well?
|
||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||
# see related comments in `.msg.types`
|
||||
# _payload_decs: (
|
||||
# dict[
|
||||
# str,
|
||||
# msgpack.Decoder,
|
||||
# ]
|
||||
# |None
|
||||
# ) = None
|
||||
# OR
|
||||
# ) = {
|
||||
# # pre-seed decoders for std-py-type-set for use when
|
||||
# # `MsgType.pld == None|Any`.
|
||||
# None: msgpack.Decoder(Any),
|
||||
# Any: msgpack.Decoder(Any),
|
||||
# }
|
||||
#
|
||||
# -[ ] do we still want to try and support the sub-decoder with
|
||||
# `.Raw` technique in the case that the `Generic` approach gives
|
||||
# future grief?
|
||||
#
|
||||
|
@ -429,6 +436,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
|
|||
#
|
||||
_def_tractor_codec: MsgCodec = mk_codec(
|
||||
ipc_pld_spec=Any,
|
||||
|
||||
# TODO: use this for debug mode locking prot?
|
||||
# ipc_pld_spec=Raw,
|
||||
)
|
||||
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
|
||||
# IPC msging codec used by the transport layer when doing
|
||||
|
@ -462,11 +472,9 @@ _def_tractor_codec: MsgCodec = mk_codec(
|
|||
|
||||
# TODO: STOP USING THIS, since it's basically a global and won't
|
||||
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
||||
_ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||
# _ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
||||
'msgspec_codec',
|
||||
|
||||
# TODO: move this to our new `Msg`-spec!
|
||||
# default=_def_msgspec_codec,
|
||||
default=_def_tractor_codec,
|
||||
)
|
||||
|
||||
|
@ -475,23 +483,36 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
|
|||
def apply_codec(
|
||||
codec: MsgCodec,
|
||||
|
||||
ctx: Context|None = None,
|
||||
|
||||
) -> MsgCodec:
|
||||
'''
|
||||
Dynamically apply a `MsgCodec` to the current task's
|
||||
runtime context such that all IPC msgs are processed
|
||||
with it for that task.
|
||||
Dynamically apply a `MsgCodec` to the current task's runtime
|
||||
context such that all (of a certain class of payload
|
||||
containing i.e. `MsgType.pld: PayloadT`) IPC msgs are
|
||||
processed with it for that task.
|
||||
|
||||
Uses a `contextvars.ContextVar` to ensure the scope of any
|
||||
codec setting matches the current `Context` or
|
||||
`._rpc.process_messages()` feeder task's prior setting without
|
||||
mutating any surrounding scope.
|
||||
|
||||
When a `ctx` is supplied, only mod its `Context.pld_codec`.
|
||||
|
||||
Uses a `tricycle.TreeVar` to ensure the scope of the codec
|
||||
matches the `@cm` block and DOES NOT change to the original
|
||||
(default) value in new tasks (as it does for `ContextVar`).
|
||||
|
||||
See the docs:
|
||||
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
orig: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
|
||||
if ctx is not None:
|
||||
var: ContextVar = ctx._var_pld_codec
|
||||
else:
|
||||
# use IPC channel-connection "global" codec
|
||||
var: ContextVar = _ctxvar_MsgCodec
|
||||
|
||||
orig: MsgCodec = var.get()
|
||||
|
||||
assert orig is not codec
|
||||
if codec.pld_spec is None:
|
||||
breakpoint()
|
||||
|
@ -500,22 +521,25 @@ def apply_codec(
|
|||
'Applying new msg-spec codec\n\n'
|
||||
f'{codec}\n'
|
||||
)
|
||||
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
|
||||
token: Token = var.set(codec)
|
||||
|
||||
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
|
||||
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||
# try:
|
||||
# ?TODO? for TreeVar approach which copies from the
|
||||
# cancel-scope of the prior value, NOT the prior task
|
||||
# See the docs:
|
||||
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
||||
# ^- see docs for @cm `.being()` API
|
||||
# with _ctxvar_MsgCodec.being(codec):
|
||||
# new = _ctxvar_MsgCodec.get()
|
||||
# assert new is codec
|
||||
# yield codec
|
||||
|
||||
try:
|
||||
yield _ctxvar_MsgCodec.get()
|
||||
yield var.get()
|
||||
finally:
|
||||
_ctxvar_MsgCodec.reset(token)
|
||||
var.reset(token)
|
||||
|
||||
assert _ctxvar_MsgCodec.get() is orig
|
||||
assert var.get() is orig
|
||||
log.info(
|
||||
'Reverted to last msg-spec codec\n\n'
|
||||
f'{orig}\n'
|
||||
|
|
|
@ -76,9 +76,11 @@ class NamespacePath(str):
|
|||
return self._ref
|
||||
|
||||
@staticmethod
|
||||
def _mk_fqnp(ref: type | object) -> tuple[str, str]:
|
||||
def _mk_fqnp(
|
||||
ref: type|object,
|
||||
) -> tuple[str, str]:
|
||||
'''
|
||||
Generate a minial ``str`` pair which describes a python
|
||||
Generate a minial `str` pair which describes a python
|
||||
object's namespace path and object/type name.
|
||||
|
||||
In more precise terms something like:
|
||||
|
@ -87,10 +89,9 @@ class NamespacePath(str):
|
|||
of THIS type XD
|
||||
|
||||
'''
|
||||
if (
|
||||
isfunction(ref)
|
||||
):
|
||||
if isfunction(ref):
|
||||
name: str = getattr(ref, '__name__')
|
||||
mod_name: str = ref.__module__
|
||||
|
||||
elif ismethod(ref):
|
||||
# build out the path manually i guess..?
|
||||
|
@ -99,15 +100,19 @@ class NamespacePath(str):
|
|||
type(ref.__self__).__name__,
|
||||
ref.__func__.__name__,
|
||||
])
|
||||
mod_name: str = ref.__self__.__module__
|
||||
|
||||
else: # object or other?
|
||||
# isinstance(ref, object)
|
||||
# and not isfunction(ref)
|
||||
name: str = type(ref).__name__
|
||||
mod_name: str = ref.__module__
|
||||
|
||||
# TODO: return static value direactly?
|
||||
#
|
||||
# fully qualified namespace path, tuple.
|
||||
fqnp: tuple[str, str] = (
|
||||
ref.__module__,
|
||||
mod_name,
|
||||
name,
|
||||
)
|
||||
return fqnp
|
||||
|
|
Loading…
Reference in New Issue