Compare commits
No commits in common. "9b4f5804706b2c7effcf9c39d579c3c2bdb41e25" and "b209990d045c9069b11279916dc529bbe3152b0b" have entirely different histories.
9b4f580470
...
b209990d04
|
@ -14,20 +14,19 @@ from typing import (
|
||||||
from contextvars import (
|
from contextvars import (
|
||||||
Context,
|
Context,
|
||||||
)
|
)
|
||||||
|
# from inspect import Parameter
|
||||||
|
|
||||||
from msgspec import (
|
from msgspec import (
|
||||||
structs,
|
structs,
|
||||||
msgpack,
|
msgpack,
|
||||||
|
# defstruct,
|
||||||
Struct,
|
Struct,
|
||||||
ValidationError,
|
ValidationError,
|
||||||
)
|
)
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import (
|
from tractor import _state
|
||||||
_state,
|
|
||||||
MsgTypeError,
|
|
||||||
)
|
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
_codec,
|
_codec,
|
||||||
_ctxvar_MsgCodec,
|
_ctxvar_MsgCodec,
|
||||||
|
@ -48,6 +47,21 @@ from tractor.msg.types import (
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
def test_msg_spec_xor_pld_spec():
|
||||||
|
'''
|
||||||
|
If the `.msg.types.Msg`-set is overridden, we
|
||||||
|
can't also support a `Msg.pld` spec.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# apply custom hooks and set a `Decoder` which only
|
||||||
|
# loads `NamespacePath` types.
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
mk_codec(
|
||||||
|
ipc_msg_spec=Any,
|
||||||
|
ipc_pld_spec=NamespacePath,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def mk_custom_codec(
|
def mk_custom_codec(
|
||||||
pld_spec: Union[Type]|Any,
|
pld_spec: Union[Type]|Any,
|
||||||
add_hooks: bool,
|
add_hooks: bool,
|
||||||
|
@ -120,9 +134,7 @@ def mk_custom_codec(
|
||||||
f'{uid}\n'
|
f'{uid}\n'
|
||||||
'FAILED DECODE\n'
|
'FAILED DECODE\n'
|
||||||
f'type-> {obj_type}\n'
|
f'type-> {obj_type}\n'
|
||||||
f'obj-arg-> `{obj}`: {type(obj)}\n\n'
|
f'obj-arg-> `{obj}`: {type(obj)}\n'
|
||||||
f'current codec:\n'
|
|
||||||
f'{current_codec()}\n'
|
|
||||||
)
|
)
|
||||||
# TODO: figure out the ignore subsys for this!
|
# TODO: figure out the ignore subsys for this!
|
||||||
# -[ ] option whether to defense-relay backc the msg
|
# -[ ] option whether to defense-relay backc the msg
|
||||||
|
@ -397,9 +409,7 @@ async def send_back_values(
|
||||||
pld_spec=ipc_pld_spec,
|
pld_spec=ipc_pld_spec,
|
||||||
add_hooks=add_hooks,
|
add_hooks=add_hooks,
|
||||||
)
|
)
|
||||||
with (
|
with apply_codec(nsp_codec) as codec:
|
||||||
apply_codec(nsp_codec) as codec,
|
|
||||||
):
|
|
||||||
chk_codec_applied(
|
chk_codec_applied(
|
||||||
expect_codec=nsp_codec,
|
expect_codec=nsp_codec,
|
||||||
enter_value=codec,
|
enter_value=codec,
|
||||||
|
@ -449,7 +459,7 @@ async def send_back_values(
|
||||||
# XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL
|
# XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL
|
||||||
# `str` handling! or special debug mode IPC
|
# `str` handling! or special debug mode IPC
|
||||||
# msgs!
|
# msgs!
|
||||||
await tractor.pause()
|
# await tractor.pause()
|
||||||
|
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
||||||
|
@ -460,8 +470,7 @@ async def send_back_values(
|
||||||
break # move on to streaming block..
|
break # move on to streaming block..
|
||||||
|
|
||||||
except tractor.MsgTypeError:
|
except tractor.MsgTypeError:
|
||||||
await tractor.pause()
|
# await tractor.pause()
|
||||||
|
|
||||||
if expect_send:
|
if expect_send:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'EXPECTED to `.started()` value given spec:\n'
|
f'EXPECTED to `.started()` value given spec:\n'
|
||||||
|
@ -643,42 +652,12 @@ def test_codec_hooks_mod(
|
||||||
|
|
||||||
pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
|
pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
|
||||||
|
|
||||||
# XXX should raise an mte (`MsgTypeError`)
|
|
||||||
# when `add_codec_hooks == False` bc the input
|
|
||||||
# `expect_ipc_send` kwarg has a nsp which can't be
|
|
||||||
# serialized!
|
|
||||||
#
|
|
||||||
# TODO:can we ensure this happens from the
|
|
||||||
# `Return`-side (aka the sub) as well?
|
|
||||||
if not add_codec_hooks:
|
|
||||||
try:
|
|
||||||
async with p.open_context(
|
|
||||||
send_back_values,
|
|
||||||
expect_debug=debug_mode,
|
|
||||||
pld_spec_type_strs=pld_spec_type_strs,
|
|
||||||
add_hooks=add_codec_hooks,
|
|
||||||
started_msg_bytes=nsp_codec.encode(expected_started),
|
|
||||||
|
|
||||||
# XXX NOTE bc we send a `NamespacePath` in this kwarg
|
|
||||||
expect_ipc_send=expect_ipc_send,
|
|
||||||
|
|
||||||
) as (ctx, first):
|
|
||||||
pytest.fail('ctx should fail to open without custom enc_hook!?')
|
|
||||||
|
|
||||||
# this test passes bc we can go no further!
|
|
||||||
except MsgTypeError:
|
|
||||||
# teardown nursery
|
|
||||||
await p.cancel_actor()
|
|
||||||
return
|
|
||||||
|
|
||||||
# TODO: send the original nsp here and
|
# TODO: send the original nsp here and
|
||||||
# test with `limit_msg_spec()` above?
|
# test with `limit_msg_spec()` above?
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
print('PARENT opening IPC ctx!\n')
|
print('PARENT opening IPC ctx!\n')
|
||||||
async with (
|
async with (
|
||||||
|
|
||||||
# XXX should raise an mte (`MsgTypeError`)
|
|
||||||
# when `add_codec_hooks == False`..
|
|
||||||
p.open_context(
|
p.open_context(
|
||||||
send_back_values,
|
send_back_values,
|
||||||
expect_debug=debug_mode,
|
expect_debug=debug_mode,
|
||||||
|
|
|
@ -26,7 +26,6 @@ disjoint, parallel executing tasks in separate actors.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from contextvars import ContextVar
|
|
||||||
from dataclasses import (
|
from dataclasses import (
|
||||||
dataclass,
|
dataclass,
|
||||||
field,
|
field,
|
||||||
|
@ -57,7 +56,6 @@ from ._exceptions import (
|
||||||
)
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
_codec,
|
|
||||||
Error,
|
Error,
|
||||||
MsgType,
|
MsgType,
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
|
@ -82,9 +80,6 @@ if TYPE_CHECKING:
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
from ._ipc import MsgTransport
|
from ._ipc import MsgTransport
|
||||||
from .devx._code import (
|
|
||||||
CallerInfo,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -504,18 +499,6 @@ class Context:
|
||||||
_started_called: bool = False
|
_started_called: bool = False
|
||||||
_stream_opened: bool = False
|
_stream_opened: bool = False
|
||||||
_stream: MsgStream|None = None
|
_stream: MsgStream|None = None
|
||||||
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
|
|
||||||
'pld_codec',
|
|
||||||
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def pld_codec(self) -> MsgCodec|None:
|
|
||||||
return self._pld_codec_var.get()
|
|
||||||
|
|
||||||
# caller of `Portal.open_context()` for
|
|
||||||
# logging purposes mostly
|
|
||||||
_caller_info: CallerInfo|None = None
|
|
||||||
|
|
||||||
# overrun handling machinery
|
# overrun handling machinery
|
||||||
# NOTE: none of this provides "backpressure" to the remote
|
# NOTE: none of this provides "backpressure" to the remote
|
||||||
|
@ -542,7 +525,6 @@ class Context:
|
||||||
|
|
||||||
# TODO: figure out how we can enforce this without losing our minds..
|
# TODO: figure out how we can enforce this without losing our minds..
|
||||||
_strict_started: bool = False
|
_strict_started: bool = False
|
||||||
_cancel_on_msgerr: bool = True
|
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
ds: str = '='
|
ds: str = '='
|
||||||
|
@ -875,7 +857,6 @@ class Context:
|
||||||
# TODO: never do this right?
|
# TODO: never do this right?
|
||||||
# if self._remote_error:
|
# if self._remote_error:
|
||||||
# return
|
# return
|
||||||
peer_side: str = self.peer_side(self.side)
|
|
||||||
|
|
||||||
# XXX: denote and set the remote side's error so that
|
# XXX: denote and set the remote side's error so that
|
||||||
# after we cancel whatever task is the opener of this
|
# after we cancel whatever task is the opener of this
|
||||||
|
@ -883,15 +864,14 @@ class Context:
|
||||||
# appropriately.
|
# appropriately.
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Setting remote error for ctx\n\n'
|
'Setting remote error for ctx\n\n'
|
||||||
f'<= {peer_side!r}: {self.chan.uid}\n'
|
f'<= remote ctx uid: {self.chan.uid}\n'
|
||||||
f'=> {self.side!r}\n\n'
|
f'=>{error}'
|
||||||
f'{error}'
|
|
||||||
)
|
)
|
||||||
self._remote_error: BaseException = error
|
self._remote_error: BaseException = error
|
||||||
|
|
||||||
# self-cancel (ack) or,
|
# self-cancel (ack) or,
|
||||||
# peer propagated remote cancellation.
|
# peer propagated remote cancellation.
|
||||||
msgerr: bool = False
|
msgtyperr: bool = False
|
||||||
if isinstance(error, ContextCancelled):
|
if isinstance(error, ContextCancelled):
|
||||||
|
|
||||||
whom: str = (
|
whom: str = (
|
||||||
|
@ -904,7 +884,7 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
elif isinstance(error, MsgTypeError):
|
elif isinstance(error, MsgTypeError):
|
||||||
msgerr = True
|
msgtyperr = True
|
||||||
peer_side: str = self.peer_side(self.side)
|
peer_side: str = self.peer_side(self.side)
|
||||||
log.error(
|
log.error(
|
||||||
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
|
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
|
||||||
|
@ -955,24 +935,13 @@ class Context:
|
||||||
and not self._is_self_cancelled()
|
and not self._is_self_cancelled()
|
||||||
and not cs.cancel_called
|
and not cs.cancel_called
|
||||||
and not cs.cancelled_caught
|
and not cs.cancelled_caught
|
||||||
and (
|
and not msgtyperr
|
||||||
msgerr
|
|
||||||
and
|
|
||||||
# NOTE: allow user to config not cancelling the
|
|
||||||
# local scope on `MsgTypeError`s
|
|
||||||
self._cancel_on_msgerr
|
|
||||||
)
|
|
||||||
):
|
):
|
||||||
# TODO: it'd sure be handy to inject our own
|
# TODO: it'd sure be handy to inject our own
|
||||||
# `trio.Cancelled` subtype here ;)
|
# `trio.Cancelled` subtype here ;)
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
log.cancel('Cancelling local `.open_context()` scope!')
|
|
||||||
self._scope.cancel()
|
self._scope.cancel()
|
||||||
|
|
||||||
else:
|
|
||||||
log.cancel('NOT cancelling local `.open_context()` scope!')
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: maybe we should also call `._res_scope.cancel()` if it
|
# TODO: maybe we should also call `._res_scope.cancel()` if it
|
||||||
# exists to support cancelling any drain loop hangs?
|
# exists to support cancelling any drain loop hangs?
|
||||||
|
|
||||||
|
@ -997,7 +966,9 @@ class Context:
|
||||||
dmaddr = dst_maddr
|
dmaddr = dst_maddr
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def repr_rpc(self) -> str:
|
def repr_rpc(
|
||||||
|
self,
|
||||||
|
) -> str:
|
||||||
# TODO: how to show the transport interchange fmt?
|
# TODO: how to show the transport interchange fmt?
|
||||||
# codec: str = self.chan.transport.codec_key
|
# codec: str = self.chan.transport.codec_key
|
||||||
outcome_str: str = self.repr_outcome(
|
outcome_str: str = self.repr_outcome(
|
||||||
|
@ -1009,27 +980,6 @@ class Context:
|
||||||
f'{self._nsf}() -> {outcome_str}:'
|
f'{self._nsf}() -> {outcome_str}:'
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
|
||||||
def repr_caller(self) -> str:
|
|
||||||
ci: CallerInfo|None = self._caller_info
|
|
||||||
if ci:
|
|
||||||
return (
|
|
||||||
f'{ci.caller_nsp}()'
|
|
||||||
# f'|_api: {ci.api_nsp}'
|
|
||||||
)
|
|
||||||
|
|
||||||
return '<UNKNOWN caller-frame>'
|
|
||||||
|
|
||||||
@property
|
|
||||||
def repr_api(self) -> str:
|
|
||||||
# ci: CallerInfo|None = self._caller_info
|
|
||||||
# if ci:
|
|
||||||
# return (
|
|
||||||
# f'{ci.api_nsp}()\n'
|
|
||||||
# )
|
|
||||||
|
|
||||||
return 'Portal.open_context()'
|
|
||||||
|
|
||||||
async def cancel(
|
async def cancel(
|
||||||
self,
|
self,
|
||||||
timeout: float = 0.616,
|
timeout: float = 0.616,
|
||||||
|
@ -1234,9 +1184,8 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE: in one way streaming this only happens on the
|
# NOTE: in one way streaming this only happens on the
|
||||||
# parent-ctx-task side (on the side that calls
|
# caller side inside `Actor.start_remote_task()` so if you try
|
||||||
# `Actor.start_remote_task()`) so if you try to send
|
# to send a stop from the caller to the callee in the
|
||||||
# a stop from the caller to the callee in the
|
|
||||||
# single-direction-stream case you'll get a lookup error
|
# single-direction-stream case you'll get a lookup error
|
||||||
# currently.
|
# currently.
|
||||||
ctx: Context = actor.get_context(
|
ctx: Context = actor.get_context(
|
||||||
|
@ -1901,19 +1850,6 @@ class Context:
|
||||||
send_chan: trio.MemorySendChannel = self._send_chan
|
send_chan: trio.MemorySendChannel = self._send_chan
|
||||||
nsf: NamespacePath = self._nsf
|
nsf: NamespacePath = self._nsf
|
||||||
|
|
||||||
side: str = self.side
|
|
||||||
if side == 'child':
|
|
||||||
assert not self._portal
|
|
||||||
peer_side: str = self.peer_side(side)
|
|
||||||
|
|
||||||
flow_body: str = (
|
|
||||||
f'<= peer {peer_side!r}: {from_uid}\n'
|
|
||||||
f' |_<{nsf}()>\n\n'
|
|
||||||
|
|
||||||
f'=> {side!r}: {self._task}\n'
|
|
||||||
f' |_<{self.repr_api} @ {self.repr_caller}>\n\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
re: Exception|None
|
re: Exception|None
|
||||||
if re := unpack_error(
|
if re := unpack_error(
|
||||||
msg,
|
msg,
|
||||||
|
@ -1924,10 +1860,18 @@ class Context:
|
||||||
else:
|
else:
|
||||||
log_meth = log.runtime
|
log_meth = log.runtime
|
||||||
|
|
||||||
|
side: str = self.side
|
||||||
|
|
||||||
|
peer_side: str = self.peer_side(side)
|
||||||
|
|
||||||
log_meth(
|
log_meth(
|
||||||
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
|
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
|
||||||
|
|
||||||
f'{flow_body}'
|
f'<= peer {peer_side!r}: {from_uid}\n'
|
||||||
|
f' |_ {nsf}()\n\n'
|
||||||
|
|
||||||
|
f'=> {side!r} cid: {cid}\n'
|
||||||
|
f' |_{self._task}\n\n'
|
||||||
|
|
||||||
f'{pformat(re)}\n'
|
f'{pformat(re)}\n'
|
||||||
)
|
)
|
||||||
|
@ -1940,27 +1884,30 @@ class Context:
|
||||||
# or `RemoteActorError`).
|
# or `RemoteActorError`).
|
||||||
self._maybe_cancel_and_set_remote_error(re)
|
self._maybe_cancel_and_set_remote_error(re)
|
||||||
|
|
||||||
# TODO: expose as mod func instead!
|
# XXX only case where returning early is fine!
|
||||||
structfmt = pretty_struct.Struct.pformat
|
structfmt = pretty_struct.Struct.pformat
|
||||||
if self._in_overrun:
|
if self._in_overrun:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Queueing OVERRUN msg on caller task:\n\n'
|
f'Queueing OVERRUN msg on caller task:\n'
|
||||||
|
f'<= peer: {from_uid}\n'
|
||||||
|
f' |_ {nsf}()\n\n'
|
||||||
|
|
||||||
f'{flow_body}'
|
f'=> cid: {cid}\n'
|
||||||
|
f' |_{self._task}\n\n'
|
||||||
|
|
||||||
f'{structfmt(msg)}\n'
|
f'{structfmt(msg)}\n'
|
||||||
)
|
)
|
||||||
self._overflow_q.append(msg)
|
self._overflow_q.append(msg)
|
||||||
|
|
||||||
# XXX NOTE XXX
|
|
||||||
# overrun is the ONLY case where returning early is fine!
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Delivering msg from IPC ctx:\n\n'
|
f'Delivering msg from IPC ctx:\n'
|
||||||
|
f'<= {from_uid}\n'
|
||||||
|
f' |_ {nsf}()\n\n'
|
||||||
|
|
||||||
f'{flow_body}'
|
f'=> {self._task}\n'
|
||||||
|
f' |_cid={self.cid}\n\n'
|
||||||
|
|
||||||
f'{structfmt(msg)}\n'
|
f'{structfmt(msg)}\n'
|
||||||
)
|
)
|
||||||
|
@ -1992,7 +1939,6 @@ class Context:
|
||||||
f'cid: {self.cid}\n'
|
f'cid: {self.cid}\n'
|
||||||
'Failed to deliver msg:\n'
|
'Failed to deliver msg:\n'
|
||||||
f'send_chan: {send_chan}\n\n'
|
f'send_chan: {send_chan}\n\n'
|
||||||
|
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
@ -2146,12 +2092,6 @@ async def open_context_from_portal(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
# denote this frame as a "runtime frame" for stack
|
|
||||||
# introspection where we report the caller code in logging
|
|
||||||
# and error message content.
|
|
||||||
# NOTE: 2 bc of the wrapping `@acm`
|
|
||||||
__runtimeframe__: int = 2 # noqa
|
|
||||||
|
|
||||||
# conduct target func method structural checks
|
# conduct target func method structural checks
|
||||||
if not inspect.iscoroutinefunction(func) and (
|
if not inspect.iscoroutinefunction(func) and (
|
||||||
getattr(func, '_tractor_contex_function', False)
|
getattr(func, '_tractor_contex_function', False)
|
||||||
|
@ -2179,8 +2119,6 @@ async def open_context_from_portal(
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
|
|
||||||
portal=portal,
|
|
||||||
|
|
||||||
# NOTE: it's imporant to expose this since you might
|
# NOTE: it's imporant to expose this since you might
|
||||||
# get the case where the parent who opened the context does
|
# get the case where the parent who opened the context does
|
||||||
# not open a stream until after some slow startup/init
|
# not open a stream until after some slow startup/init
|
||||||
|
@ -2191,17 +2129,13 @@ async def open_context_from_portal(
|
||||||
# place..
|
# place..
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
assert ctx._remote_func_type == 'context'
|
# ASAP, so that `Context.side: str` can be determined for
|
||||||
assert ctx._caller_info
|
# logging / tracing / debug!
|
||||||
|
ctx._portal: Portal = portal
|
||||||
|
|
||||||
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
assert ctx._remote_func_type == 'context'
|
||||||
# `Started`-msg any cancellation triggered
|
|
||||||
# in `._maybe_cancel_and_set_remote_error()` will
|
|
||||||
# NOT actually cancel the below line!
|
|
||||||
# -> it's expected that if there is an error in this phase of
|
|
||||||
# the dialog, the `Error` msg should be raised from the `msg`
|
|
||||||
# handling block below.
|
|
||||||
msg: Started = await ctx._recv_chan.receive()
|
msg: Started = await ctx._recv_chan.receive()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# the "first" value here is delivered by the callee's
|
# the "first" value here is delivered by the callee's
|
||||||
# ``Context.started()`` call.
|
# ``Context.started()`` call.
|
||||||
|
@ -2211,7 +2145,6 @@ async def open_context_from_portal(
|
||||||
|
|
||||||
# except KeyError as src_error:
|
# except KeyError as src_error:
|
||||||
except AttributeError as src_error:
|
except AttributeError as src_error:
|
||||||
log.exception('Raising from unexpected msg!\n')
|
|
||||||
_raise_from_no_key_in_msg(
|
_raise_from_no_key_in_msg(
|
||||||
ctx=ctx,
|
ctx=ctx,
|
||||||
msg=msg,
|
msg=msg,
|
||||||
|
@ -2637,6 +2570,7 @@ async def open_context_from_portal(
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def mk_context(
|
def mk_context(
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
cid: str,
|
cid: str,
|
||||||
|
@ -2658,10 +2592,6 @@ def mk_context(
|
||||||
recv_chan: trio.MemoryReceiveChannel
|
recv_chan: trio.MemoryReceiveChannel
|
||||||
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
||||||
|
|
||||||
# TODO: only scan caller-info if log level so high!
|
|
||||||
from .devx._code import find_caller_info
|
|
||||||
caller_info: CallerInfo|None = find_caller_info()
|
|
||||||
|
|
||||||
ctx = Context(
|
ctx = Context(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -2670,7 +2600,6 @@ def mk_context(
|
||||||
_recv_chan=recv_chan,
|
_recv_chan=recv_chan,
|
||||||
_nsf=nsf,
|
_nsf=nsf,
|
||||||
_task=trio.lowlevel.current_task(),
|
_task=trio.lowlevel.current_task(),
|
||||||
_caller_info=caller_info,
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
# TODO: we can drop the old placeholder yah?
|
# TODO: we can drop the old placeholder yah?
|
||||||
|
@ -2681,11 +2610,7 @@ def mk_context(
|
||||||
|
|
||||||
def context(func: Callable) -> Callable:
|
def context(func: Callable) -> Callable:
|
||||||
'''
|
'''
|
||||||
Mark an (async) function as an SC-supervised, inter-`Actor`,
|
Mark an async function as a streaming routine with ``@context``.
|
||||||
child-`trio.Task`, IPC endpoint otherwise known more
|
|
||||||
colloquially as a (RPC) "context".
|
|
||||||
|
|
||||||
Functions annotated the fundamental IPC endpoint type offered by `tractor`.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# TODO: apply whatever solution ``mypy`` ends up picking for this:
|
# TODO: apply whatever solution ``mypy`` ends up picking for this:
|
||||||
|
|
|
@ -935,7 +935,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
|
||||||
def _raise_from_no_key_in_msg(
|
def _raise_from_no_key_in_msg(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
src_err: AttributeError,
|
src_err: KeyError,
|
||||||
log: StackLevelAdapter, # caller specific `log` obj
|
log: StackLevelAdapter, # caller specific `log` obj
|
||||||
|
|
||||||
expect_msg: str = Yield,
|
expect_msg: str = Yield,
|
||||||
|
@ -994,7 +994,7 @@ def _raise_from_no_key_in_msg(
|
||||||
ctx.chan,
|
ctx.chan,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
|
|
||||||
) from src_err
|
) from None
|
||||||
|
|
||||||
# `MsgStream` termination msg.
|
# `MsgStream` termination msg.
|
||||||
# TODO: does it make more sense to pack
|
# TODO: does it make more sense to pack
|
||||||
|
|
|
@ -314,7 +314,8 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
header: bytes = await self.recv_stream.receive_exactly(4)
|
header = await self.recv_stream.receive_exactly(4)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
ValueError,
|
ValueError,
|
||||||
ConnectionResetError,
|
ConnectionResetError,
|
||||||
|
@ -336,7 +337,8 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
size, = struct.unpack("<I", header)
|
size, = struct.unpack("<I", header)
|
||||||
|
|
||||||
log.transport(f'received header {size}') # type: ignore
|
log.transport(f'received header {size}') # type: ignore
|
||||||
msg_bytes: bytes = await self.recv_stream.receive_exactly(size)
|
|
||||||
|
msg_bytes = await self.recv_stream.receive_exactly(size)
|
||||||
|
|
||||||
log.transport(f"received {msg_bytes}") # type: ignore
|
log.transport(f"received {msg_bytes}") # type: ignore
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -161,18 +161,17 @@ class Portal:
|
||||||
self._expect_result = await self.actor.start_remote_task(
|
self._expect_result = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=NamespacePath(f'{ns}:{func}'),
|
nsf=NamespacePath(f'{ns}:{func}'),
|
||||||
kwargs=kwargs,
|
kwargs=kwargs
|
||||||
portal=self,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _return_once(
|
async def _return_once(
|
||||||
self,
|
self,
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
|
||||||
) -> Return:
|
) -> dict[str, Any]:
|
||||||
|
|
||||||
assert ctx._remote_func_type == 'asyncfunc' # single response
|
assert ctx._remote_func_type == 'asyncfunc' # single response
|
||||||
msg: Return = await ctx._recv_chan.receive()
|
msg: dict = await ctx._recv_chan.receive()
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
async def result(self) -> Any:
|
async def result(self) -> Any:
|
||||||
|
@ -248,8 +247,6 @@ class Portal:
|
||||||
purpose.
|
purpose.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
|
||||||
|
|
||||||
chan: Channel = self.channel
|
chan: Channel = self.channel
|
||||||
if not chan.connected():
|
if not chan.connected():
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
@ -327,18 +324,16 @@ class Portal:
|
||||||
internals!
|
internals!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
|
||||||
nsf = NamespacePath(
|
nsf = NamespacePath(
|
||||||
f'{namespace_path}:{function_name}'
|
f'{namespace_path}:{function_name}'
|
||||||
)
|
)
|
||||||
ctx: Context = await self.actor.start_remote_task(
|
ctx = await self.actor.start_remote_task(
|
||||||
chan=self.channel,
|
chan=self.channel,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
portal=self,
|
|
||||||
)
|
)
|
||||||
ctx._portal: Portal = self
|
ctx._portal = self
|
||||||
msg: Return = await self._return_once(ctx)
|
msg = await self._return_once(ctx)
|
||||||
return _unwrap_msg(
|
return _unwrap_msg(
|
||||||
msg,
|
msg,
|
||||||
self.channel,
|
self.channel,
|
||||||
|
@ -389,7 +384,6 @@ class Portal:
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
portal=self,
|
|
||||||
)
|
)
|
||||||
ctx._portal = self
|
ctx._portal = self
|
||||||
return _unwrap_msg(
|
return _unwrap_msg(
|
||||||
|
@ -404,14 +398,6 @@ class Portal:
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> AsyncGenerator[MsgStream, None]:
|
) -> AsyncGenerator[MsgStream, None]:
|
||||||
'''
|
|
||||||
Legacy one-way streaming API.
|
|
||||||
|
|
||||||
TODO: re-impl on top `Portal.open_context()` + an async gen
|
|
||||||
around `Context.open_stream()`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
__runtimeframe__: int = 1 # noqa
|
|
||||||
|
|
||||||
if not inspect.isasyncgenfunction(async_gen_func):
|
if not inspect.isasyncgenfunction(async_gen_func):
|
||||||
if not (
|
if not (
|
||||||
|
@ -425,7 +411,6 @@ class Portal:
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=NamespacePath.from_ref(async_gen_func),
|
nsf=NamespacePath.from_ref(async_gen_func),
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
portal=self,
|
|
||||||
)
|
)
|
||||||
ctx._portal = self
|
ctx._portal = self
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ async def open_root_actor(
|
||||||
|
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
# on our debugger lock state.
|
# on our debugger lock state.
|
||||||
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
||||||
|
|
||||||
# mark top most level process as root actor
|
# mark top most level process as root actor
|
||||||
_state._runtime_vars['_is_root'] = True
|
_state._runtime_vars['_is_root'] = True
|
||||||
|
|
|
@ -814,7 +814,7 @@ async def process_messages(
|
||||||
# should use it?
|
# should use it?
|
||||||
# https://github.com/python-trio/trio/issues/467
|
# https://github.com/python-trio/trio/issues/467
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Entering RPC msg loop:\n'
|
'Entering IPC msg loop:\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
)
|
)
|
||||||
|
@ -876,7 +876,7 @@ async def process_messages(
|
||||||
# XXX NOTE XXX don't start entire actor
|
# XXX NOTE XXX don't start entire actor
|
||||||
# runtime cancellation if this actor is
|
# runtime cancellation if this actor is
|
||||||
# currently in debug mode!
|
# currently in debug mode!
|
||||||
pdb_complete: trio.Event|None = _debug.DebugStatus.repl_release
|
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
|
||||||
if pdb_complete:
|
if pdb_complete:
|
||||||
await pdb_complete.wait()
|
await pdb_complete.wait()
|
||||||
|
|
||||||
|
@ -1073,7 +1073,7 @@ async def process_messages(
|
||||||
log.exception(message)
|
log.exception(message)
|
||||||
raise RuntimeError(message)
|
raise RuntimeError(message)
|
||||||
|
|
||||||
log.transport(
|
log.runtime(
|
||||||
'Waiting on next IPC msg from\n'
|
'Waiting on next IPC msg from\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
|
|
|
@ -267,13 +267,10 @@ class Actor:
|
||||||
self._listeners: list[trio.abc.Listener] = []
|
self._listeners: list[trio.abc.Listener] = []
|
||||||
self._parent_chan: Channel|None = None
|
self._parent_chan: Channel|None = None
|
||||||
self._forkserver_info: tuple|None = None
|
self._forkserver_info: tuple|None = None
|
||||||
|
|
||||||
# track each child/sub-actor in it's locally
|
|
||||||
# supervising nursery
|
|
||||||
self._actoruid2nursery: dict[
|
self._actoruid2nursery: dict[
|
||||||
tuple[str, str], # sub-`Actor.uid`
|
tuple[str, str],
|
||||||
ActorNursery|None,
|
ActorNursery|None,
|
||||||
] = {}
|
] = {} # type: ignore # noqa
|
||||||
|
|
||||||
# when provided, init the registry addresses property from
|
# when provided, init the registry addresses property from
|
||||||
# input via the validator.
|
# input via the validator.
|
||||||
|
@ -662,18 +659,12 @@ class Actor:
|
||||||
|
|
||||||
# TODO: NEEEDS TO BE TESTED!
|
# TODO: NEEEDS TO BE TESTED!
|
||||||
# actually, no idea if this ever even enters.. XD
|
# actually, no idea if this ever even enters.. XD
|
||||||
#
|
|
||||||
# XXX => YES IT DOES, when i was testing ctl-c
|
|
||||||
# from broken debug TTY locking due to
|
|
||||||
# msg-spec races on application using RunVar...
|
|
||||||
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
||||||
if (
|
if (
|
||||||
pdb_user_uid
|
pdb_user_uid
|
||||||
and local_nursery
|
and local_nursery
|
||||||
):
|
):
|
||||||
entry: tuple|None = local_nursery._children.get(
|
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
|
||||||
tuple(pdb_user_uid)
|
|
||||||
)
|
|
||||||
if entry:
|
if entry:
|
||||||
proc: trio.Process
|
proc: trio.Process
|
||||||
_, proc, _ = entry
|
_, proc, _ = entry
|
||||||
|
@ -683,10 +674,10 @@ class Actor:
|
||||||
and poll() is None
|
and poll() is None
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Root actor reports no-more-peers, BUT\n'
|
'Root actor reports no-more-peers, BUT '
|
||||||
'a DISCONNECTED child still has the debug '
|
'a DISCONNECTED child still has the debug '
|
||||||
'lock!\n\n'
|
'lock!\n'
|
||||||
# f'root uid: {self.uid}\n'
|
f'root uid: {self.uid}\n'
|
||||||
f'last disconnected child uid: {uid}\n'
|
f'last disconnected child uid: {uid}\n'
|
||||||
f'locking child uid: {pdb_user_uid}\n'
|
f'locking child uid: {pdb_user_uid}\n'
|
||||||
)
|
)
|
||||||
|
@ -712,8 +703,9 @@ class Actor:
|
||||||
# if a now stale local task has the TTY lock still
|
# if a now stale local task has the TTY lock still
|
||||||
# we cancel it to allow servicing other requests for
|
# we cancel it to allow servicing other requests for
|
||||||
# the lock.
|
# the lock.
|
||||||
|
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
|
||||||
if (
|
if (
|
||||||
(db_cs := pdb_lock.get_locking_task_cs())
|
db_cs
|
||||||
and not db_cs.cancel_called
|
and not db_cs.cancel_called
|
||||||
and uid == pdb_user_uid
|
and uid == pdb_user_uid
|
||||||
):
|
):
|
||||||
|
@ -750,7 +742,7 @@ class Actor:
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Ignoring invalid IPC ctx msg!\n\n'
|
'Ignoring invalid IPC ctx msg!\n\n'
|
||||||
f'<= sender: {uid}\n\n'
|
f'<= sender: {uid}\n'
|
||||||
# XXX don't need right since it's always in msg?
|
# XXX don't need right since it's always in msg?
|
||||||
# f'=> cid: {cid}\n\n'
|
# f'=> cid: {cid}\n\n'
|
||||||
|
|
||||||
|
@ -804,7 +796,7 @@ class Actor:
|
||||||
cid,
|
cid,
|
||||||
# side,
|
# side,
|
||||||
)]
|
)]
|
||||||
log.debug(
|
log.runtime(
|
||||||
f'Retreived cached IPC ctx for\n'
|
f'Retreived cached IPC ctx for\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'cid:{cid}\n'
|
f'cid:{cid}\n'
|
||||||
|
@ -843,14 +835,10 @@ class Actor:
|
||||||
nsf: NamespacePath,
|
nsf: NamespacePath,
|
||||||
kwargs: dict,
|
kwargs: dict,
|
||||||
|
|
||||||
# determines `Context.side: str`
|
|
||||||
portal: Portal|None = None,
|
|
||||||
|
|
||||||
# IPC channel config
|
# IPC channel config
|
||||||
msg_buffer_size: int|None = None,
|
msg_buffer_size: int|None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
load_nsf: bool = False,
|
load_nsf: bool = False,
|
||||||
ack_timeout: float = 3,
|
|
||||||
|
|
||||||
) -> Context:
|
) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -875,12 +863,10 @@ class Actor:
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
ctx._portal = portal
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
'self' in nsf
|
'self' in nsf
|
||||||
or
|
or not load_nsf
|
||||||
not load_nsf
|
|
||||||
):
|
):
|
||||||
ns, _, func = nsf.partition(':')
|
ns, _, func = nsf.partition(':')
|
||||||
else:
|
else:
|
||||||
|
@ -888,29 +874,42 @@ class Actor:
|
||||||
# -[ ] but, how to do `self:<Actor.meth>`??
|
# -[ ] but, how to do `self:<Actor.meth>`??
|
||||||
ns, func = nsf.to_tuple()
|
ns, func = nsf.to_tuple()
|
||||||
|
|
||||||
msg = msgtypes.Start(
|
|
||||||
ns=ns,
|
|
||||||
func=func,
|
|
||||||
kwargs=kwargs,
|
|
||||||
uid=self.uid,
|
|
||||||
cid=cid,
|
|
||||||
)
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Sending RPC start msg\n\n'
|
'Sending cmd to\n'
|
||||||
f'=> peer: {chan.uid}\n'
|
f'peer: {chan.uid} => \n'
|
||||||
f' |_ {ns}.{func}({kwargs})\n'
|
'\n'
|
||||||
|
f'=> {ns}.{func}({kwargs})\n'
|
||||||
)
|
)
|
||||||
await chan.send(msg)
|
await chan.send(
|
||||||
|
msgtypes.Start(
|
||||||
|
ns=ns,
|
||||||
|
func=func,
|
||||||
|
kwargs=kwargs,
|
||||||
|
uid=self.uid,
|
||||||
|
cid=cid,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# {'cmd': (
|
||||||
|
# ns,
|
||||||
|
# func,
|
||||||
|
# kwargs,
|
||||||
|
# self.uid,
|
||||||
|
# cid,
|
||||||
|
# )}
|
||||||
|
# )
|
||||||
|
|
||||||
# NOTE wait on first `StartAck` response msg and validate;
|
# Wait on first response msg and validate; this should be
|
||||||
# this should be immediate and does not (yet) wait for the
|
# immediate.
|
||||||
# remote child task to sync via `Context.started()`.
|
# first_msg: dict = await ctx._recv_chan.receive()
|
||||||
with trio.fail_after(ack_timeout):
|
# functype: str = first_msg.get('functype')
|
||||||
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
|
||||||
|
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
||||||
try:
|
try:
|
||||||
functype: str = first_msg.functype
|
functype: str = first_msg.functype
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
raise unpack_error(first_msg, chan)
|
raise unpack_error(first_msg, chan)
|
||||||
|
# if 'error' in first_msg:
|
||||||
|
# raise unpack_error(first_msg, chan)
|
||||||
|
|
||||||
if functype not in (
|
if functype not in (
|
||||||
'asyncfunc',
|
'asyncfunc',
|
||||||
|
@ -918,7 +917,7 @@ class Actor:
|
||||||
'context',
|
'context',
|
||||||
):
|
):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
|
f'{first_msg} is an invalid response packet?'
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx._remote_func_type = functype
|
ctx._remote_func_type = functype
|
||||||
|
@ -1163,7 +1162,7 @@ class Actor:
|
||||||
|
|
||||||
# kill any debugger request task to avoid deadlock
|
# kill any debugger request task to avoid deadlock
|
||||||
# with the root actor in this tree
|
# with the root actor in this tree
|
||||||
dbcs = _debug.DebugStatus.req_cs
|
dbcs = _debug.Lock._debugger_request_cs
|
||||||
if dbcs is not None:
|
if dbcs is not None:
|
||||||
msg += (
|
msg += (
|
||||||
'>> Cancelling active debugger request..\n'
|
'>> Cancelling active debugger request..\n'
|
||||||
|
@ -1238,9 +1237,9 @@ class Actor:
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# NOTE: during msging race conditions this will often
|
# NOTE: during msging race conditions this will often
|
||||||
# emit, some examples:
|
# emit, some examples:
|
||||||
# - child returns a result before cancel-msg/ctxc-raised
|
# - callee returns a result before cancel-msg/ctxc-raised
|
||||||
# - child self raises ctxc before parent send request,
|
# - callee self raises ctxc before caller send request,
|
||||||
# - child errors prior to cancel req.
|
# - callee errors prior to cancel req.
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancel request invalid, RPC task already completed?\n\n'
|
'Cancel request invalid, RPC task already completed?\n\n'
|
||||||
f'<= canceller: {requesting_uid}\n\n'
|
f'<= canceller: {requesting_uid}\n\n'
|
||||||
|
@ -1303,15 +1302,15 @@ class Actor:
|
||||||
flow_info: str = (
|
flow_info: str = (
|
||||||
f'<= canceller: {requesting_uid}\n'
|
f'<= canceller: {requesting_uid}\n'
|
||||||
f'=> ipc-parent: {parent_chan}\n'
|
f'=> ipc-parent: {parent_chan}\n'
|
||||||
f'|_{ctx}\n'
|
f' |_{ctx}\n'
|
||||||
)
|
)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Waiting on RPC task to cancel\n\n'
|
'Waiting on RPC task to cancel\n'
|
||||||
f'{flow_info}'
|
f'{flow_info}'
|
||||||
)
|
)
|
||||||
await is_complete.wait()
|
await is_complete.wait()
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Sucessfully cancelled RPC task\n\n'
|
f'Sucessfully cancelled RPC task\n'
|
||||||
f'{flow_info}'
|
f'{flow_info}'
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
@ -1537,8 +1536,8 @@ async def async_main(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
# on our debugger state.
|
# on our debugger lock state.
|
||||||
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
||||||
|
|
||||||
is_registered: bool = False
|
is_registered: bool = False
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -30,16 +30,11 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
_current_actor: Actor|None = None # type: ignore # noqa
|
_current_actor: Actor|None = None # type: ignore # noqa
|
||||||
_last_actor_terminated: Actor|None = None
|
_last_actor_terminated: Actor|None = None
|
||||||
|
|
||||||
# TODO: mk this a `msgspec.Struct`!
|
|
||||||
_runtime_vars: dict[str, Any] = {
|
_runtime_vars: dict[str, Any] = {
|
||||||
'_debug_mode': False,
|
'_debug_mode': False,
|
||||||
'_is_root': False,
|
'_is_root': False,
|
||||||
'_root_mailbox': (None, None),
|
'_root_mailbox': (None, None),
|
||||||
'_registry_addrs': [],
|
'_registry_addrs': [],
|
||||||
|
|
||||||
# for `breakpoint()` support
|
|
||||||
'use_greenback': False,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -66,7 +61,7 @@ def current_actor(
|
||||||
err_on_no_runtime
|
err_on_no_runtime
|
||||||
and _current_actor is None
|
and _current_actor is None
|
||||||
):
|
):
|
||||||
msg: str = 'No local actor has been initialized yet?\n'
|
msg: str = 'No local actor has been initialized yet'
|
||||||
from ._exceptions import NoRuntime
|
from ._exceptions import NoRuntime
|
||||||
|
|
||||||
if last := last_actor():
|
if last := last_actor():
|
||||||
|
@ -79,8 +74,8 @@ def current_actor(
|
||||||
# this process.
|
# this process.
|
||||||
else:
|
else:
|
||||||
msg += (
|
msg += (
|
||||||
# 'No last actor found?\n'
|
'No last actor found?\n'
|
||||||
'\nDid you forget to call one of,\n'
|
'Did you forget to open one of:\n\n'
|
||||||
'- `tractor.open_root_actor()`\n'
|
'- `tractor.open_root_actor()`\n'
|
||||||
'- `tractor.open_nursery()`\n'
|
'- `tractor.open_nursery()`\n'
|
||||||
)
|
)
|
||||||
|
|
|
@ -377,17 +377,14 @@ class MsgStream(trio.abc.Channel):
|
||||||
# await rx_chan.aclose()
|
# await rx_chan.aclose()
|
||||||
|
|
||||||
if not self._eoc:
|
if not self._eoc:
|
||||||
message: str = (
|
log.cancel(
|
||||||
f'Context stream closed by {self._ctx.side!r}\n'
|
'Stream closed by self before it received an EoC?\n'
|
||||||
|
'Setting eoc manually..\n..'
|
||||||
|
)
|
||||||
|
self._eoc: bool = trio.EndOfChannel(
|
||||||
|
f'Context stream closed by self({self._ctx.side})\n'
|
||||||
f'|_{self}\n'
|
f'|_{self}\n'
|
||||||
)
|
)
|
||||||
log.cancel(
|
|
||||||
'Stream self-closed before receiving EoC\n\n'
|
|
||||||
+
|
|
||||||
message
|
|
||||||
)
|
|
||||||
self._eoc = trio.EndOfChannel(message)
|
|
||||||
|
|
||||||
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
||||||
# => NO, DEFINITELY NOT! <=
|
# => NO, DEFINITELY NOT! <=
|
||||||
# if we're a bi-dir ``MsgStream`` BECAUSE this same
|
# if we're a bi-dir ``MsgStream`` BECAUSE this same
|
||||||
|
|
|
@ -131,12 +131,7 @@ class ActorNursery:
|
||||||
"main task" besides the runtime.
|
"main task" besides the runtime.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
||||||
loglevel: str = (
|
|
||||||
loglevel
|
|
||||||
or self._actor.loglevel
|
|
||||||
or get_loglevel()
|
|
||||||
)
|
|
||||||
|
|
||||||
# configure and pass runtime state
|
# configure and pass runtime state
|
||||||
_rtv = _state._runtime_vars.copy()
|
_rtv = _state._runtime_vars.copy()
|
||||||
|
@ -214,7 +209,6 @@ class ActorNursery:
|
||||||
the actor is terminated.
|
the actor is terminated.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
|
||||||
mod_path: str = fn.__module__
|
mod_path: str = fn.__module__
|
||||||
|
|
||||||
if name is None:
|
if name is None:
|
||||||
|
@ -263,7 +257,6 @@ class ActorNursery:
|
||||||
directly without any far end graceful ``trio`` cancellation.
|
directly without any far end graceful ``trio`` cancellation.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
|
||||||
self.cancelled = True
|
self.cancelled = True
|
||||||
|
|
||||||
# TODO: impl a repr for spawn more compact
|
# TODO: impl a repr for spawn more compact
|
||||||
|
|
|
@ -27,6 +27,7 @@ from ._debug import (
|
||||||
pause as pause,
|
pause as pause,
|
||||||
pause_from_sync as pause_from_sync,
|
pause_from_sync as pause_from_sync,
|
||||||
shield_sigint_handler as shield_sigint_handler,
|
shield_sigint_handler as shield_sigint_handler,
|
||||||
|
MultiActorPdb as MultiActorPdb,
|
||||||
open_crash_handler as open_crash_handler,
|
open_crash_handler as open_crash_handler,
|
||||||
maybe_open_crash_handler as maybe_open_crash_handler,
|
maybe_open_crash_handler as maybe_open_crash_handler,
|
||||||
post_mortem as post_mortem,
|
post_mortem as post_mortem,
|
||||||
|
|
|
@ -1,177 +0,0 @@
|
||||||
# tractor: structured concurrent "actors".
|
|
||||||
# Copyright 2018-eternity Tyler Goodlet.
|
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 3 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
'''
|
|
||||||
Tools for code-object annotation, introspection and mutation
|
|
||||||
as it pertains to improving the grok-ability of our runtime!
|
|
||||||
|
|
||||||
'''
|
|
||||||
from __future__ import annotations
|
|
||||||
import inspect
|
|
||||||
# import msgspec
|
|
||||||
# from pprint import pformat
|
|
||||||
from types import (
|
|
||||||
FrameType,
|
|
||||||
FunctionType,
|
|
||||||
MethodType,
|
|
||||||
# CodeType,
|
|
||||||
)
|
|
||||||
from typing import (
|
|
||||||
# Any,
|
|
||||||
Callable,
|
|
||||||
# TYPE_CHECKING,
|
|
||||||
Type,
|
|
||||||
)
|
|
||||||
|
|
||||||
from tractor.msg import (
|
|
||||||
pretty_struct,
|
|
||||||
NamespacePath,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: yeah, i don't love this and we should prolly just
|
|
||||||
# write a decorator that actually keeps a stupid ref to the func
|
|
||||||
# obj..
|
|
||||||
def get_class_from_frame(fr: FrameType) -> (
|
|
||||||
FunctionType
|
|
||||||
|MethodType
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Attempt to get the function (or method) reference
|
|
||||||
from a given `FrameType`.
|
|
||||||
|
|
||||||
Verbatim from an SO:
|
|
||||||
https://stackoverflow.com/a/2220759
|
|
||||||
|
|
||||||
'''
|
|
||||||
args, _, _, value_dict = inspect.getargvalues(fr)
|
|
||||||
|
|
||||||
# we check the first parameter for the frame function is
|
|
||||||
# named 'self'
|
|
||||||
if (
|
|
||||||
len(args)
|
|
||||||
and
|
|
||||||
# TODO: other cases for `@classmethod` etc..?)
|
|
||||||
args[0] == 'self'
|
|
||||||
):
|
|
||||||
# in that case, 'self' will be referenced in value_dict
|
|
||||||
instance: object = value_dict.get('self')
|
|
||||||
if instance:
|
|
||||||
# return its class
|
|
||||||
return getattr(
|
|
||||||
instance,
|
|
||||||
'__class__',
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
|
|
||||||
# return None otherwise
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def func_ref_from_frame(
|
|
||||||
frame: FrameType,
|
|
||||||
) -> Callable:
|
|
||||||
func_name: str = frame.f_code.co_name
|
|
||||||
try:
|
|
||||||
return frame.f_globals[func_name]
|
|
||||||
except KeyError:
|
|
||||||
cls: Type|None = get_class_from_frame(frame)
|
|
||||||
if cls:
|
|
||||||
return getattr(
|
|
||||||
cls,
|
|
||||||
func_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: move all this into new `.devx._code`!
|
|
||||||
# -[ ] prolly create a `@runtime_api` dec?
|
|
||||||
# -[ ] ^- make it capture and/or accept buncha optional
|
|
||||||
# meta-data like a fancier version of `@pdbp.hideframe`.
|
|
||||||
#
|
|
||||||
class CallerInfo(pretty_struct.Struct):
|
|
||||||
rt_fi: inspect.FrameInfo
|
|
||||||
call_frame: FrameType
|
|
||||||
|
|
||||||
@property
|
|
||||||
def api_func_ref(self) -> Callable|None:
|
|
||||||
return func_ref_from_frame(self.rt_fi.frame)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def api_nsp(self) -> NamespacePath|None:
|
|
||||||
func: FunctionType = self.api_func_ref
|
|
||||||
if func:
|
|
||||||
return NamespacePath.from_ref(func)
|
|
||||||
|
|
||||||
return '<unknown>'
|
|
||||||
|
|
||||||
@property
|
|
||||||
def caller_func_ref(self) -> Callable|None:
|
|
||||||
return func_ref_from_frame(self.call_frame)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def caller_nsp(self) -> NamespacePath|None:
|
|
||||||
func: FunctionType = self.caller_func_ref
|
|
||||||
if func:
|
|
||||||
return NamespacePath.from_ref(func)
|
|
||||||
|
|
||||||
return '<unknown>'
|
|
||||||
|
|
||||||
|
|
||||||
def find_caller_info(
|
|
||||||
dunder_var: str = '__runtimeframe__',
|
|
||||||
iframes:int = 1,
|
|
||||||
check_frame_depth: bool = True,
|
|
||||||
|
|
||||||
) -> CallerInfo|None:
|
|
||||||
'''
|
|
||||||
Scan up the callstack for a frame with a `dunder_var: str` variable
|
|
||||||
and return the `iframes` frames above it.
|
|
||||||
|
|
||||||
By default we scan for a `__runtimeframe__` scope var which
|
|
||||||
denotes a `tractor` API above which (one frame up) is "user
|
|
||||||
app code" which "called into" the `tractor` method or func.
|
|
||||||
|
|
||||||
TODO: ex with `Portal.open_context()`
|
|
||||||
|
|
||||||
'''
|
|
||||||
# TODO: use this instead?
|
|
||||||
# https://docs.python.org/3/library/inspect.html#inspect.getouterframes
|
|
||||||
frames: list[inspect.FrameInfo] = inspect.stack()
|
|
||||||
for fi in frames:
|
|
||||||
assert (
|
|
||||||
fi.function
|
|
||||||
==
|
|
||||||
fi.frame.f_code.co_name
|
|
||||||
)
|
|
||||||
this_frame: FrameType = fi.frame
|
|
||||||
dunder_val: int|None = this_frame.f_locals.get(dunder_var)
|
|
||||||
if dunder_val:
|
|
||||||
go_up_iframes: int = (
|
|
||||||
dunder_val # could be 0 or `True` i guess?
|
|
||||||
or
|
|
||||||
iframes
|
|
||||||
)
|
|
||||||
rt_frame: FrameType = fi.frame
|
|
||||||
call_frame = rt_frame
|
|
||||||
for i in range(go_up_iframes):
|
|
||||||
call_frame = call_frame.f_back
|
|
||||||
|
|
||||||
return CallerInfo(
|
|
||||||
rt_fi=fi,
|
|
||||||
call_frame=call_frame,
|
|
||||||
)
|
|
||||||
|
|
||||||
return None
|
|
File diff suppressed because it is too large
Load Diff
|
@ -23,31 +23,12 @@ into each ``trio.Nursery`` except it links the lifetimes of memory space
|
||||||
disjoint, parallel executing tasks in separate actors.
|
disjoint, parallel executing tasks in separate actors.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
|
||||||
import multiprocessing as mp
|
|
||||||
from signal import (
|
from signal import (
|
||||||
signal,
|
signal,
|
||||||
SIGUSR1,
|
SIGUSR1,
|
||||||
)
|
)
|
||||||
import traceback
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from tractor import (
|
|
||||||
_state,
|
|
||||||
log as logmod,
|
|
||||||
)
|
|
||||||
|
|
||||||
log = logmod.get_logger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from tractor._spawn import ProcessType
|
|
||||||
from tractor import (
|
|
||||||
Actor,
|
|
||||||
ActorNursery,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@trio.lowlevel.disable_ki_protection
|
@trio.lowlevel.disable_ki_protection
|
||||||
def dump_task_tree() -> None:
|
def dump_task_tree() -> None:
|
||||||
|
@ -60,15 +41,9 @@ def dump_task_tree() -> None:
|
||||||
recurse_child_tasks=True
|
recurse_child_tasks=True
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log = get_console_log(
|
log = get_console_log('cancel')
|
||||||
name=__name__,
|
|
||||||
level='cancel',
|
|
||||||
)
|
|
||||||
actor: Actor = _state.current_actor()
|
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'Dumping `stackscope` tree for actor\n'
|
f'Dumping `stackscope` tree:\n\n'
|
||||||
f'{actor.name}: {actor}\n'
|
|
||||||
f' |_{mp.current_process()}\n\n'
|
|
||||||
f'{tree_str}\n'
|
f'{tree_str}\n'
|
||||||
)
|
)
|
||||||
# import logging
|
# import logging
|
||||||
|
@ -81,13 +56,8 @@ def dump_task_tree() -> None:
|
||||||
# ).exception("Error printing task tree")
|
# ).exception("Error printing task tree")
|
||||||
|
|
||||||
|
|
||||||
def signal_handler(
|
def signal_handler(sig: int, frame: object) -> None:
|
||||||
sig: int,
|
import traceback
|
||||||
frame: object,
|
|
||||||
|
|
||||||
relay_to_subs: bool = True,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
try:
|
try:
|
||||||
trio.lowlevel.current_trio_token(
|
trio.lowlevel.current_trio_token(
|
||||||
).run_sync_soon(dump_task_tree)
|
).run_sync_soon(dump_task_tree)
|
||||||
|
@ -95,26 +65,6 @@ def signal_handler(
|
||||||
# not in async context -- print a normal traceback
|
# not in async context -- print a normal traceback
|
||||||
traceback.print_stack()
|
traceback.print_stack()
|
||||||
|
|
||||||
if not relay_to_subs:
|
|
||||||
return
|
|
||||||
|
|
||||||
an: ActorNursery
|
|
||||||
for an in _state.current_actor()._actoruid2nursery.values():
|
|
||||||
|
|
||||||
subproc: ProcessType
|
|
||||||
subactor: Actor
|
|
||||||
for subactor, subproc, _ in an._children.values():
|
|
||||||
log.pdb(
|
|
||||||
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
|
|
||||||
f'{subactor}\n'
|
|
||||||
f' |_{subproc}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(subproc, trio.Process):
|
|
||||||
subproc.send_signal(sig)
|
|
||||||
|
|
||||||
elif isinstance(subproc, mp.Process):
|
|
||||||
subproc._send_signal(sig)
|
|
||||||
|
|
||||||
|
|
||||||
def enable_stack_on_sig(
|
def enable_stack_on_sig(
|
||||||
|
@ -132,6 +82,3 @@ def enable_stack_on_sig(
|
||||||
# NOTE: not the above can be triggered from
|
# NOTE: not the above can be triggered from
|
||||||
# a (xonsh) shell using:
|
# a (xonsh) shell using:
|
||||||
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
||||||
#
|
|
||||||
# for example if you were looking to trace a `pytest` run
|
|
||||||
# kill -SIGUSR1 @$(pgrep -f 'pytest')
|
|
||||||
|
|
|
@ -33,29 +33,25 @@ from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
)
|
)
|
||||||
from contextvars import (
|
# from contextvars import (
|
||||||
ContextVar,
|
# ContextVar,
|
||||||
Token,
|
# Token,
|
||||||
)
|
# )
|
||||||
import textwrap
|
import textwrap
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
Type,
|
Type,
|
||||||
TYPE_CHECKING,
|
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
|
||||||
import msgspec
|
import msgspec
|
||||||
from msgspec import (
|
from msgspec import msgpack
|
||||||
msgpack,
|
from trio.lowlevel import (
|
||||||
Raw,
|
RunVar,
|
||||||
|
RunVarToken,
|
||||||
)
|
)
|
||||||
# from trio.lowlevel import (
|
|
||||||
# RunVar,
|
|
||||||
# RunVarToken,
|
|
||||||
# )
|
|
||||||
# TODO: see notes below from @mikenerone..
|
# TODO: see notes below from @mikenerone..
|
||||||
# from tricycle import TreeVar
|
# from tricycle import TreeVar
|
||||||
|
|
||||||
|
@ -66,9 +62,6 @@ from tractor.msg.types import (
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from tractor._context import Context
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
||||||
|
@ -164,6 +157,24 @@ class MsgCodec(Struct):
|
||||||
|
|
||||||
lib: ModuleType = msgspec
|
lib: ModuleType = msgspec
|
||||||
|
|
||||||
|
# TODO: a sub-decoder system as well?
|
||||||
|
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||||
|
# see related comments in `.msg.types`
|
||||||
|
# _payload_decs: (
|
||||||
|
# dict[
|
||||||
|
# str,
|
||||||
|
# msgpack.Decoder,
|
||||||
|
# ]
|
||||||
|
# |None
|
||||||
|
# ) = None
|
||||||
|
# OR
|
||||||
|
# ) = {
|
||||||
|
# # pre-seed decoders for std-py-type-set for use when
|
||||||
|
# # `MsgType.pld == None|Any`.
|
||||||
|
# None: msgpack.Decoder(Any),
|
||||||
|
# Any: msgpack.Decoder(Any),
|
||||||
|
# }
|
||||||
|
|
||||||
# TODO: use `functools.cached_property` for these ?
|
# TODO: use `functools.cached_property` for these ?
|
||||||
# https://docs.python.org/3/library/functools.html#functools.cached_property
|
# https://docs.python.org/3/library/functools.html#functools.cached_property
|
||||||
@property
|
@property
|
||||||
|
@ -199,25 +210,7 @@ class MsgCodec(Struct):
|
||||||
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
||||||
return self._dec.decode(msg)
|
return self._dec.decode(msg)
|
||||||
|
|
||||||
# TODO: a sub-decoder system as well?
|
# TODO: do we still want to try and support the sub-decoder with
|
||||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
|
||||||
# see related comments in `.msg.types`
|
|
||||||
# _payload_decs: (
|
|
||||||
# dict[
|
|
||||||
# str,
|
|
||||||
# msgpack.Decoder,
|
|
||||||
# ]
|
|
||||||
# |None
|
|
||||||
# ) = None
|
|
||||||
# OR
|
|
||||||
# ) = {
|
|
||||||
# # pre-seed decoders for std-py-type-set for use when
|
|
||||||
# # `MsgType.pld == None|Any`.
|
|
||||||
# None: msgpack.Decoder(Any),
|
|
||||||
# Any: msgpack.Decoder(Any),
|
|
||||||
# }
|
|
||||||
#
|
|
||||||
# -[ ] do we still want to try and support the sub-decoder with
|
|
||||||
# `.Raw` technique in the case that the `Generic` approach gives
|
# `.Raw` technique in the case that the `Generic` approach gives
|
||||||
# future grief?
|
# future grief?
|
||||||
#
|
#
|
||||||
|
@ -436,9 +429,6 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
|
||||||
#
|
#
|
||||||
_def_tractor_codec: MsgCodec = mk_codec(
|
_def_tractor_codec: MsgCodec = mk_codec(
|
||||||
ipc_pld_spec=Any,
|
ipc_pld_spec=Any,
|
||||||
|
|
||||||
# TODO: use this for debug mode locking prot?
|
|
||||||
# ipc_pld_spec=Raw,
|
|
||||||
)
|
)
|
||||||
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
|
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
|
||||||
# IPC msging codec used by the transport layer when doing
|
# IPC msging codec used by the transport layer when doing
|
||||||
|
@ -472,9 +462,11 @@ _def_tractor_codec: MsgCodec = mk_codec(
|
||||||
|
|
||||||
# TODO: STOP USING THIS, since it's basically a global and won't
|
# TODO: STOP USING THIS, since it's basically a global and won't
|
||||||
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
||||||
# _ctxvar_MsgCodec: MsgCodec = RunVar(
|
_ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||||
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
|
||||||
'msgspec_codec',
|
'msgspec_codec',
|
||||||
|
|
||||||
|
# TODO: move this to our new `Msg`-spec!
|
||||||
|
# default=_def_msgspec_codec,
|
||||||
default=_def_tractor_codec,
|
default=_def_tractor_codec,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -483,36 +475,23 @@ _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
||||||
def apply_codec(
|
def apply_codec(
|
||||||
codec: MsgCodec,
|
codec: MsgCodec,
|
||||||
|
|
||||||
ctx: Context|None = None,
|
|
||||||
|
|
||||||
) -> MsgCodec:
|
) -> MsgCodec:
|
||||||
'''
|
'''
|
||||||
Dynamically apply a `MsgCodec` to the current task's runtime
|
Dynamically apply a `MsgCodec` to the current task's
|
||||||
context such that all (of a certain class of payload
|
runtime context such that all IPC msgs are processed
|
||||||
containing i.e. `MsgType.pld: PayloadT`) IPC msgs are
|
with it for that task.
|
||||||
processed with it for that task.
|
|
||||||
|
|
||||||
Uses a `contextvars.ContextVar` to ensure the scope of any
|
|
||||||
codec setting matches the current `Context` or
|
|
||||||
`._rpc.process_messages()` feeder task's prior setting without
|
|
||||||
mutating any surrounding scope.
|
|
||||||
|
|
||||||
When a `ctx` is supplied, only mod its `Context.pld_codec`.
|
|
||||||
|
|
||||||
|
Uses a `tricycle.TreeVar` to ensure the scope of the codec
|
||||||
matches the `@cm` block and DOES NOT change to the original
|
matches the `@cm` block and DOES NOT change to the original
|
||||||
(default) value in new tasks (as it does for `ContextVar`).
|
(default) value in new tasks (as it does for `ContextVar`).
|
||||||
|
|
||||||
|
See the docs:
|
||||||
|
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||||
|
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
|
orig: MsgCodec = _ctxvar_MsgCodec.get()
|
||||||
if ctx is not None:
|
|
||||||
var: ContextVar = ctx._var_pld_codec
|
|
||||||
else:
|
|
||||||
# use IPC channel-connection "global" codec
|
|
||||||
var: ContextVar = _ctxvar_MsgCodec
|
|
||||||
|
|
||||||
orig: MsgCodec = var.get()
|
|
||||||
|
|
||||||
assert orig is not codec
|
assert orig is not codec
|
||||||
if codec.pld_spec is None:
|
if codec.pld_spec is None:
|
||||||
breakpoint()
|
breakpoint()
|
||||||
|
@ -521,25 +500,22 @@ def apply_codec(
|
||||||
'Applying new msg-spec codec\n\n'
|
'Applying new msg-spec codec\n\n'
|
||||||
f'{codec}\n'
|
f'{codec}\n'
|
||||||
)
|
)
|
||||||
token: Token = var.set(codec)
|
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
|
||||||
|
|
||||||
# ?TODO? for TreeVar approach which copies from the
|
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
|
||||||
# cancel-scope of the prior value, NOT the prior task
|
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||||
# See the docs:
|
# try:
|
||||||
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
# with _ctxvar_MsgCodec.being(codec):
|
||||||
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
# new = _ctxvar_MsgCodec.get()
|
||||||
# ^- see docs for @cm `.being()` API
|
# assert new is codec
|
||||||
# with _ctxvar_MsgCodec.being(codec):
|
# yield codec
|
||||||
# new = _ctxvar_MsgCodec.get()
|
|
||||||
# assert new is codec
|
|
||||||
# yield codec
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield var.get()
|
yield _ctxvar_MsgCodec.get()
|
||||||
finally:
|
finally:
|
||||||
var.reset(token)
|
_ctxvar_MsgCodec.reset(token)
|
||||||
|
|
||||||
assert var.get() is orig
|
assert _ctxvar_MsgCodec.get() is orig
|
||||||
log.info(
|
log.info(
|
||||||
'Reverted to last msg-spec codec\n\n'
|
'Reverted to last msg-spec codec\n\n'
|
||||||
f'{orig}\n'
|
f'{orig}\n'
|
||||||
|
|
|
@ -76,11 +76,9 @@ class NamespacePath(str):
|
||||||
return self._ref
|
return self._ref
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _mk_fqnp(
|
def _mk_fqnp(ref: type | object) -> tuple[str, str]:
|
||||||
ref: type|object,
|
|
||||||
) -> tuple[str, str]:
|
|
||||||
'''
|
'''
|
||||||
Generate a minial `str` pair which describes a python
|
Generate a minial ``str`` pair which describes a python
|
||||||
object's namespace path and object/type name.
|
object's namespace path and object/type name.
|
||||||
|
|
||||||
In more precise terms something like:
|
In more precise terms something like:
|
||||||
|
@ -89,9 +87,10 @@ class NamespacePath(str):
|
||||||
of THIS type XD
|
of THIS type XD
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if isfunction(ref):
|
if (
|
||||||
|
isfunction(ref)
|
||||||
|
):
|
||||||
name: str = getattr(ref, '__name__')
|
name: str = getattr(ref, '__name__')
|
||||||
mod_name: str = ref.__module__
|
|
||||||
|
|
||||||
elif ismethod(ref):
|
elif ismethod(ref):
|
||||||
# build out the path manually i guess..?
|
# build out the path manually i guess..?
|
||||||
|
@ -100,19 +99,15 @@ class NamespacePath(str):
|
||||||
type(ref.__self__).__name__,
|
type(ref.__self__).__name__,
|
||||||
ref.__func__.__name__,
|
ref.__func__.__name__,
|
||||||
])
|
])
|
||||||
mod_name: str = ref.__self__.__module__
|
|
||||||
|
|
||||||
else: # object or other?
|
else: # object or other?
|
||||||
# isinstance(ref, object)
|
# isinstance(ref, object)
|
||||||
# and not isfunction(ref)
|
# and not isfunction(ref)
|
||||||
name: str = type(ref).__name__
|
name: str = type(ref).__name__
|
||||||
mod_name: str = ref.__module__
|
|
||||||
|
|
||||||
# TODO: return static value direactly?
|
|
||||||
#
|
|
||||||
# fully qualified namespace path, tuple.
|
# fully qualified namespace path, tuple.
|
||||||
fqnp: tuple[str, str] = (
|
fqnp: tuple[str, str] = (
|
||||||
mod_name,
|
ref.__module__,
|
||||||
name,
|
name,
|
||||||
)
|
)
|
||||||
return fqnp
|
return fqnp
|
||||||
|
@ -120,7 +115,7 @@ class NamespacePath(str):
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_ref(
|
def from_ref(
|
||||||
cls,
|
cls,
|
||||||
ref: type|object,
|
ref: type | object,
|
||||||
|
|
||||||
) -> NamespacePath:
|
) -> NamespacePath:
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue