Compare commits
15 Commits
b209990d04
...
9b4f580470
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 9b4f580470 | |
Tyler Goodlet | d51be2a36a | |
Tyler Goodlet | 3018187228 | |
Tyler Goodlet | e5f0b450cf | |
Tyler Goodlet | 4aa24f8518 | |
Tyler Goodlet | d2f6428e46 | |
Tyler Goodlet | 5439060cd3 | |
Tyler Goodlet | 7372404d76 | |
Tyler Goodlet | 77a15ebf19 | |
Tyler Goodlet | d0e7610073 | |
Tyler Goodlet | a73b24cf4a | |
Tyler Goodlet | 5dfff3f75a | |
Tyler Goodlet | d4155396bf | |
Tyler Goodlet | 3869e91b19 | |
Tyler Goodlet | 829dfa7520 |
|
@ -14,19 +14,20 @@ from typing import (
|
||||||
from contextvars import (
|
from contextvars import (
|
||||||
Context,
|
Context,
|
||||||
)
|
)
|
||||||
# from inspect import Parameter
|
|
||||||
|
|
||||||
from msgspec import (
|
from msgspec import (
|
||||||
structs,
|
structs,
|
||||||
msgpack,
|
msgpack,
|
||||||
# defstruct,
|
|
||||||
Struct,
|
Struct,
|
||||||
ValidationError,
|
ValidationError,
|
||||||
)
|
)
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import _state
|
from tractor import (
|
||||||
|
_state,
|
||||||
|
MsgTypeError,
|
||||||
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
_codec,
|
_codec,
|
||||||
_ctxvar_MsgCodec,
|
_ctxvar_MsgCodec,
|
||||||
|
@ -47,21 +48,6 @@ from tractor.msg.types import (
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
||||||
def test_msg_spec_xor_pld_spec():
|
|
||||||
'''
|
|
||||||
If the `.msg.types.Msg`-set is overridden, we
|
|
||||||
can't also support a `Msg.pld` spec.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# apply custom hooks and set a `Decoder` which only
|
|
||||||
# loads `NamespacePath` types.
|
|
||||||
with pytest.raises(RuntimeError):
|
|
||||||
mk_codec(
|
|
||||||
ipc_msg_spec=Any,
|
|
||||||
ipc_pld_spec=NamespacePath,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def mk_custom_codec(
|
def mk_custom_codec(
|
||||||
pld_spec: Union[Type]|Any,
|
pld_spec: Union[Type]|Any,
|
||||||
add_hooks: bool,
|
add_hooks: bool,
|
||||||
|
@ -134,7 +120,9 @@ def mk_custom_codec(
|
||||||
f'{uid}\n'
|
f'{uid}\n'
|
||||||
'FAILED DECODE\n'
|
'FAILED DECODE\n'
|
||||||
f'type-> {obj_type}\n'
|
f'type-> {obj_type}\n'
|
||||||
f'obj-arg-> `{obj}`: {type(obj)}\n'
|
f'obj-arg-> `{obj}`: {type(obj)}\n\n'
|
||||||
|
f'current codec:\n'
|
||||||
|
f'{current_codec()}\n'
|
||||||
)
|
)
|
||||||
# TODO: figure out the ignore subsys for this!
|
# TODO: figure out the ignore subsys for this!
|
||||||
# -[ ] option whether to defense-relay backc the msg
|
# -[ ] option whether to defense-relay backc the msg
|
||||||
|
@ -409,7 +397,9 @@ async def send_back_values(
|
||||||
pld_spec=ipc_pld_spec,
|
pld_spec=ipc_pld_spec,
|
||||||
add_hooks=add_hooks,
|
add_hooks=add_hooks,
|
||||||
)
|
)
|
||||||
with apply_codec(nsp_codec) as codec:
|
with (
|
||||||
|
apply_codec(nsp_codec) as codec,
|
||||||
|
):
|
||||||
chk_codec_applied(
|
chk_codec_applied(
|
||||||
expect_codec=nsp_codec,
|
expect_codec=nsp_codec,
|
||||||
enter_value=codec,
|
enter_value=codec,
|
||||||
|
@ -459,7 +449,7 @@ async def send_back_values(
|
||||||
# XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL
|
# XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL
|
||||||
# `str` handling! or special debug mode IPC
|
# `str` handling! or special debug mode IPC
|
||||||
# msgs!
|
# msgs!
|
||||||
# await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
||||||
|
@ -470,7 +460,8 @@ async def send_back_values(
|
||||||
break # move on to streaming block..
|
break # move on to streaming block..
|
||||||
|
|
||||||
except tractor.MsgTypeError:
|
except tractor.MsgTypeError:
|
||||||
# await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
if expect_send:
|
if expect_send:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'EXPECTED to `.started()` value given spec:\n'
|
f'EXPECTED to `.started()` value given spec:\n'
|
||||||
|
@ -652,12 +643,42 @@ def test_codec_hooks_mod(
|
||||||
|
|
||||||
pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
|
pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
|
||||||
|
|
||||||
|
# XXX should raise an mte (`MsgTypeError`)
|
||||||
|
# when `add_codec_hooks == False` bc the input
|
||||||
|
# `expect_ipc_send` kwarg has a nsp which can't be
|
||||||
|
# serialized!
|
||||||
|
#
|
||||||
|
# TODO:can we ensure this happens from the
|
||||||
|
# `Return`-side (aka the sub) as well?
|
||||||
|
if not add_codec_hooks:
|
||||||
|
try:
|
||||||
|
async with p.open_context(
|
||||||
|
send_back_values,
|
||||||
|
expect_debug=debug_mode,
|
||||||
|
pld_spec_type_strs=pld_spec_type_strs,
|
||||||
|
add_hooks=add_codec_hooks,
|
||||||
|
started_msg_bytes=nsp_codec.encode(expected_started),
|
||||||
|
|
||||||
|
# XXX NOTE bc we send a `NamespacePath` in this kwarg
|
||||||
|
expect_ipc_send=expect_ipc_send,
|
||||||
|
|
||||||
|
) as (ctx, first):
|
||||||
|
pytest.fail('ctx should fail to open without custom enc_hook!?')
|
||||||
|
|
||||||
|
# this test passes bc we can go no further!
|
||||||
|
except MsgTypeError:
|
||||||
|
# teardown nursery
|
||||||
|
await p.cancel_actor()
|
||||||
|
return
|
||||||
|
|
||||||
# TODO: send the original nsp here and
|
# TODO: send the original nsp here and
|
||||||
# test with `limit_msg_spec()` above?
|
# test with `limit_msg_spec()` above?
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
print('PARENT opening IPC ctx!\n')
|
print('PARENT opening IPC ctx!\n')
|
||||||
async with (
|
async with (
|
||||||
|
|
||||||
|
# XXX should raise an mte (`MsgTypeError`)
|
||||||
|
# when `add_codec_hooks == False`..
|
||||||
p.open_context(
|
p.open_context(
|
||||||
send_back_values,
|
send_back_values,
|
||||||
expect_debug=debug_mode,
|
expect_debug=debug_mode,
|
||||||
|
|
|
@ -26,6 +26,7 @@ disjoint, parallel executing tasks in separate actors.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
from contextvars import ContextVar
|
||||||
from dataclasses import (
|
from dataclasses import (
|
||||||
dataclass,
|
dataclass,
|
||||||
field,
|
field,
|
||||||
|
@ -56,6 +57,7 @@ from ._exceptions import (
|
||||||
)
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
|
_codec,
|
||||||
Error,
|
Error,
|
||||||
MsgType,
|
MsgType,
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
|
@ -80,6 +82,9 @@ if TYPE_CHECKING:
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
from ._ipc import MsgTransport
|
from ._ipc import MsgTransport
|
||||||
|
from .devx._code import (
|
||||||
|
CallerInfo,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -499,6 +504,18 @@ class Context:
|
||||||
_started_called: bool = False
|
_started_called: bool = False
|
||||||
_stream_opened: bool = False
|
_stream_opened: bool = False
|
||||||
_stream: MsgStream|None = None
|
_stream: MsgStream|None = None
|
||||||
|
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
|
||||||
|
'pld_codec',
|
||||||
|
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pld_codec(self) -> MsgCodec|None:
|
||||||
|
return self._pld_codec_var.get()
|
||||||
|
|
||||||
|
# caller of `Portal.open_context()` for
|
||||||
|
# logging purposes mostly
|
||||||
|
_caller_info: CallerInfo|None = None
|
||||||
|
|
||||||
# overrun handling machinery
|
# overrun handling machinery
|
||||||
# NOTE: none of this provides "backpressure" to the remote
|
# NOTE: none of this provides "backpressure" to the remote
|
||||||
|
@ -525,6 +542,7 @@ class Context:
|
||||||
|
|
||||||
# TODO: figure out how we can enforce this without losing our minds..
|
# TODO: figure out how we can enforce this without losing our minds..
|
||||||
_strict_started: bool = False
|
_strict_started: bool = False
|
||||||
|
_cancel_on_msgerr: bool = True
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
ds: str = '='
|
ds: str = '='
|
||||||
|
@ -857,6 +875,7 @@ class Context:
|
||||||
# TODO: never do this right?
|
# TODO: never do this right?
|
||||||
# if self._remote_error:
|
# if self._remote_error:
|
||||||
# return
|
# return
|
||||||
|
peer_side: str = self.peer_side(self.side)
|
||||||
|
|
||||||
# XXX: denote and set the remote side's error so that
|
# XXX: denote and set the remote side's error so that
|
||||||
# after we cancel whatever task is the opener of this
|
# after we cancel whatever task is the opener of this
|
||||||
|
@ -864,14 +883,15 @@ class Context:
|
||||||
# appropriately.
|
# appropriately.
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Setting remote error for ctx\n\n'
|
'Setting remote error for ctx\n\n'
|
||||||
f'<= remote ctx uid: {self.chan.uid}\n'
|
f'<= {peer_side!r}: {self.chan.uid}\n'
|
||||||
f'=>{error}'
|
f'=> {self.side!r}\n\n'
|
||||||
|
f'{error}'
|
||||||
)
|
)
|
||||||
self._remote_error: BaseException = error
|
self._remote_error: BaseException = error
|
||||||
|
|
||||||
# self-cancel (ack) or,
|
# self-cancel (ack) or,
|
||||||
# peer propagated remote cancellation.
|
# peer propagated remote cancellation.
|
||||||
msgtyperr: bool = False
|
msgerr: bool = False
|
||||||
if isinstance(error, ContextCancelled):
|
if isinstance(error, ContextCancelled):
|
||||||
|
|
||||||
whom: str = (
|
whom: str = (
|
||||||
|
@ -884,7 +904,7 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
elif isinstance(error, MsgTypeError):
|
elif isinstance(error, MsgTypeError):
|
||||||
msgtyperr = True
|
msgerr = True
|
||||||
peer_side: str = self.peer_side(self.side)
|
peer_side: str = self.peer_side(self.side)
|
||||||
log.error(
|
log.error(
|
||||||
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
|
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
|
||||||
|
@ -935,13 +955,24 @@ class Context:
|
||||||
and not self._is_self_cancelled()
|
and not self._is_self_cancelled()
|
||||||
and not cs.cancel_called
|
and not cs.cancel_called
|
||||||
and not cs.cancelled_caught
|
and not cs.cancelled_caught
|
||||||
and not msgtyperr
|
and (
|
||||||
|
msgerr
|
||||||
|
and
|
||||||
|
# NOTE: allow user to config not cancelling the
|
||||||
|
# local scope on `MsgTypeError`s
|
||||||
|
self._cancel_on_msgerr
|
||||||
|
)
|
||||||
):
|
):
|
||||||
# TODO: it'd sure be handy to inject our own
|
# TODO: it'd sure be handy to inject our own
|
||||||
# `trio.Cancelled` subtype here ;)
|
# `trio.Cancelled` subtype here ;)
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
|
log.cancel('Cancelling local `.open_context()` scope!')
|
||||||
self._scope.cancel()
|
self._scope.cancel()
|
||||||
|
|
||||||
|
else:
|
||||||
|
log.cancel('NOT cancelling local `.open_context()` scope!')
|
||||||
|
|
||||||
|
|
||||||
# TODO: maybe we should also call `._res_scope.cancel()` if it
|
# TODO: maybe we should also call `._res_scope.cancel()` if it
|
||||||
# exists to support cancelling any drain loop hangs?
|
# exists to support cancelling any drain loop hangs?
|
||||||
|
|
||||||
|
@ -966,9 +997,7 @@ class Context:
|
||||||
dmaddr = dst_maddr
|
dmaddr = dst_maddr
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def repr_rpc(
|
def repr_rpc(self) -> str:
|
||||||
self,
|
|
||||||
) -> str:
|
|
||||||
# TODO: how to show the transport interchange fmt?
|
# TODO: how to show the transport interchange fmt?
|
||||||
# codec: str = self.chan.transport.codec_key
|
# codec: str = self.chan.transport.codec_key
|
||||||
outcome_str: str = self.repr_outcome(
|
outcome_str: str = self.repr_outcome(
|
||||||
|
@ -980,6 +1009,27 @@ class Context:
|
||||||
f'{self._nsf}() -> {outcome_str}:'
|
f'{self._nsf}() -> {outcome_str}:'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def repr_caller(self) -> str:
|
||||||
|
ci: CallerInfo|None = self._caller_info
|
||||||
|
if ci:
|
||||||
|
return (
|
||||||
|
f'{ci.caller_nsp}()'
|
||||||
|
# f'|_api: {ci.api_nsp}'
|
||||||
|
)
|
||||||
|
|
||||||
|
return '<UNKNOWN caller-frame>'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def repr_api(self) -> str:
|
||||||
|
# ci: CallerInfo|None = self._caller_info
|
||||||
|
# if ci:
|
||||||
|
# return (
|
||||||
|
# f'{ci.api_nsp}()\n'
|
||||||
|
# )
|
||||||
|
|
||||||
|
return 'Portal.open_context()'
|
||||||
|
|
||||||
async def cancel(
|
async def cancel(
|
||||||
self,
|
self,
|
||||||
timeout: float = 0.616,
|
timeout: float = 0.616,
|
||||||
|
@ -1184,8 +1234,9 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE: in one way streaming this only happens on the
|
# NOTE: in one way streaming this only happens on the
|
||||||
# caller side inside `Actor.start_remote_task()` so if you try
|
# parent-ctx-task side (on the side that calls
|
||||||
# to send a stop from the caller to the callee in the
|
# `Actor.start_remote_task()`) so if you try to send
|
||||||
|
# a stop from the caller to the callee in the
|
||||||
# single-direction-stream case you'll get a lookup error
|
# single-direction-stream case you'll get a lookup error
|
||||||
# currently.
|
# currently.
|
||||||
ctx: Context = actor.get_context(
|
ctx: Context = actor.get_context(
|
||||||
|
@ -1850,6 +1901,19 @@ class Context:
|
||||||
send_chan: trio.MemorySendChannel = self._send_chan
|
send_chan: trio.MemorySendChannel = self._send_chan
|
||||||
nsf: NamespacePath = self._nsf
|
nsf: NamespacePath = self._nsf
|
||||||
|
|
||||||
|
side: str = self.side
|
||||||
|
if side == 'child':
|
||||||
|
assert not self._portal
|
||||||
|
peer_side: str = self.peer_side(side)
|
||||||
|
|
||||||
|
flow_body: str = (
|
||||||
|
f'<= peer {peer_side!r}: {from_uid}\n'
|
||||||
|
f' |_<{nsf}()>\n\n'
|
||||||
|
|
||||||
|
f'=> {side!r}: {self._task}\n'
|
||||||
|
f' |_<{self.repr_api} @ {self.repr_caller}>\n\n'
|
||||||
|
)
|
||||||
|
|
||||||
re: Exception|None
|
re: Exception|None
|
||||||
if re := unpack_error(
|
if re := unpack_error(
|
||||||
msg,
|
msg,
|
||||||
|
@ -1860,18 +1924,10 @@ class Context:
|
||||||
else:
|
else:
|
||||||
log_meth = log.runtime
|
log_meth = log.runtime
|
||||||
|
|
||||||
side: str = self.side
|
|
||||||
|
|
||||||
peer_side: str = self.peer_side(side)
|
|
||||||
|
|
||||||
log_meth(
|
log_meth(
|
||||||
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
|
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
|
||||||
|
|
||||||
f'<= peer {peer_side!r}: {from_uid}\n'
|
f'{flow_body}'
|
||||||
f' |_ {nsf}()\n\n'
|
|
||||||
|
|
||||||
f'=> {side!r} cid: {cid}\n'
|
|
||||||
f' |_{self._task}\n\n'
|
|
||||||
|
|
||||||
f'{pformat(re)}\n'
|
f'{pformat(re)}\n'
|
||||||
)
|
)
|
||||||
|
@ -1884,30 +1940,27 @@ class Context:
|
||||||
# or `RemoteActorError`).
|
# or `RemoteActorError`).
|
||||||
self._maybe_cancel_and_set_remote_error(re)
|
self._maybe_cancel_and_set_remote_error(re)
|
||||||
|
|
||||||
# XXX only case where returning early is fine!
|
# TODO: expose as mod func instead!
|
||||||
structfmt = pretty_struct.Struct.pformat
|
structfmt = pretty_struct.Struct.pformat
|
||||||
if self._in_overrun:
|
if self._in_overrun:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Queueing OVERRUN msg on caller task:\n'
|
f'Queueing OVERRUN msg on caller task:\n\n'
|
||||||
f'<= peer: {from_uid}\n'
|
|
||||||
f' |_ {nsf}()\n\n'
|
|
||||||
|
|
||||||
f'=> cid: {cid}\n'
|
f'{flow_body}'
|
||||||
f' |_{self._task}\n\n'
|
|
||||||
|
|
||||||
f'{structfmt(msg)}\n'
|
f'{structfmt(msg)}\n'
|
||||||
)
|
)
|
||||||
self._overflow_q.append(msg)
|
self._overflow_q.append(msg)
|
||||||
|
|
||||||
|
# XXX NOTE XXX
|
||||||
|
# overrun is the ONLY case where returning early is fine!
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Delivering msg from IPC ctx:\n'
|
f'Delivering msg from IPC ctx:\n\n'
|
||||||
f'<= {from_uid}\n'
|
|
||||||
f' |_ {nsf}()\n\n'
|
|
||||||
|
|
||||||
f'=> {self._task}\n'
|
f'{flow_body}'
|
||||||
f' |_cid={self.cid}\n\n'
|
|
||||||
|
|
||||||
f'{structfmt(msg)}\n'
|
f'{structfmt(msg)}\n'
|
||||||
)
|
)
|
||||||
|
@ -1939,6 +1992,7 @@ class Context:
|
||||||
f'cid: {self.cid}\n'
|
f'cid: {self.cid}\n'
|
||||||
'Failed to deliver msg:\n'
|
'Failed to deliver msg:\n'
|
||||||
f'send_chan: {send_chan}\n\n'
|
f'send_chan: {send_chan}\n\n'
|
||||||
|
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
@ -2092,6 +2146,12 @@ async def open_context_from_portal(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
|
# denote this frame as a "runtime frame" for stack
|
||||||
|
# introspection where we report the caller code in logging
|
||||||
|
# and error message content.
|
||||||
|
# NOTE: 2 bc of the wrapping `@acm`
|
||||||
|
__runtimeframe__: int = 2 # noqa
|
||||||
|
|
||||||
# conduct target func method structural checks
|
# conduct target func method structural checks
|
||||||
if not inspect.iscoroutinefunction(func) and (
|
if not inspect.iscoroutinefunction(func) and (
|
||||||
getattr(func, '_tractor_contex_function', False)
|
getattr(func, '_tractor_contex_function', False)
|
||||||
|
@ -2119,6 +2179,8 @@ async def open_context_from_portal(
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
|
|
||||||
|
portal=portal,
|
||||||
|
|
||||||
# NOTE: it's imporant to expose this since you might
|
# NOTE: it's imporant to expose this since you might
|
||||||
# get the case where the parent who opened the context does
|
# get the case where the parent who opened the context does
|
||||||
# not open a stream until after some slow startup/init
|
# not open a stream until after some slow startup/init
|
||||||
|
@ -2129,13 +2191,17 @@ async def open_context_from_portal(
|
||||||
# place..
|
# place..
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
# ASAP, so that `Context.side: str` can be determined for
|
|
||||||
# logging / tracing / debug!
|
|
||||||
ctx._portal: Portal = portal
|
|
||||||
|
|
||||||
assert ctx._remote_func_type == 'context'
|
assert ctx._remote_func_type == 'context'
|
||||||
msg: Started = await ctx._recv_chan.receive()
|
assert ctx._caller_info
|
||||||
|
|
||||||
|
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
||||||
|
# `Started`-msg any cancellation triggered
|
||||||
|
# in `._maybe_cancel_and_set_remote_error()` will
|
||||||
|
# NOT actually cancel the below line!
|
||||||
|
# -> it's expected that if there is an error in this phase of
|
||||||
|
# the dialog, the `Error` msg should be raised from the `msg`
|
||||||
|
# handling block below.
|
||||||
|
msg: Started = await ctx._recv_chan.receive()
|
||||||
try:
|
try:
|
||||||
# the "first" value here is delivered by the callee's
|
# the "first" value here is delivered by the callee's
|
||||||
# ``Context.started()`` call.
|
# ``Context.started()`` call.
|
||||||
|
@ -2145,6 +2211,7 @@ async def open_context_from_portal(
|
||||||
|
|
||||||
# except KeyError as src_error:
|
# except KeyError as src_error:
|
||||||
except AttributeError as src_error:
|
except AttributeError as src_error:
|
||||||
|
log.exception('Raising from unexpected msg!\n')
|
||||||
_raise_from_no_key_in_msg(
|
_raise_from_no_key_in_msg(
|
||||||
ctx=ctx,
|
ctx=ctx,
|
||||||
msg=msg,
|
msg=msg,
|
||||||
|
@ -2570,7 +2637,6 @@ async def open_context_from_portal(
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def mk_context(
|
def mk_context(
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
cid: str,
|
cid: str,
|
||||||
|
@ -2592,6 +2658,10 @@ def mk_context(
|
||||||
recv_chan: trio.MemoryReceiveChannel
|
recv_chan: trio.MemoryReceiveChannel
|
||||||
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
||||||
|
|
||||||
|
# TODO: only scan caller-info if log level so high!
|
||||||
|
from .devx._code import find_caller_info
|
||||||
|
caller_info: CallerInfo|None = find_caller_info()
|
||||||
|
|
||||||
ctx = Context(
|
ctx = Context(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -2600,6 +2670,7 @@ def mk_context(
|
||||||
_recv_chan=recv_chan,
|
_recv_chan=recv_chan,
|
||||||
_nsf=nsf,
|
_nsf=nsf,
|
||||||
_task=trio.lowlevel.current_task(),
|
_task=trio.lowlevel.current_task(),
|
||||||
|
_caller_info=caller_info,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
# TODO: we can drop the old placeholder yah?
|
# TODO: we can drop the old placeholder yah?
|
||||||
|
@ -2610,7 +2681,11 @@ def mk_context(
|
||||||
|
|
||||||
def context(func: Callable) -> Callable:
|
def context(func: Callable) -> Callable:
|
||||||
'''
|
'''
|
||||||
Mark an async function as a streaming routine with ``@context``.
|
Mark an (async) function as an SC-supervised, inter-`Actor`,
|
||||||
|
child-`trio.Task`, IPC endpoint otherwise known more
|
||||||
|
colloquially as a (RPC) "context".
|
||||||
|
|
||||||
|
Functions annotated the fundamental IPC endpoint type offered by `tractor`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# TODO: apply whatever solution ``mypy`` ends up picking for this:
|
# TODO: apply whatever solution ``mypy`` ends up picking for this:
|
||||||
|
|
|
@ -935,7 +935,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
|
||||||
def _raise_from_no_key_in_msg(
|
def _raise_from_no_key_in_msg(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
src_err: KeyError,
|
src_err: AttributeError,
|
||||||
log: StackLevelAdapter, # caller specific `log` obj
|
log: StackLevelAdapter, # caller specific `log` obj
|
||||||
|
|
||||||
expect_msg: str = Yield,
|
expect_msg: str = Yield,
|
||||||
|
@ -994,7 +994,7 @@ def _raise_from_no_key_in_msg(
|
||||||
ctx.chan,
|
ctx.chan,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
|
|
||||||
) from None
|
) from src_err
|
||||||
|
|
||||||
# `MsgStream` termination msg.
|
# `MsgStream` termination msg.
|
||||||
# TODO: does it make more sense to pack
|
# TODO: does it make more sense to pack
|
||||||
|
|
|
@ -314,8 +314,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
header = await self.recv_stream.receive_exactly(4)
|
header: bytes = await self.recv_stream.receive_exactly(4)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
ValueError,
|
ValueError,
|
||||||
ConnectionResetError,
|
ConnectionResetError,
|
||||||
|
@ -337,8 +336,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
size, = struct.unpack("<I", header)
|
size, = struct.unpack("<I", header)
|
||||||
|
|
||||||
log.transport(f'received header {size}') # type: ignore
|
log.transport(f'received header {size}') # type: ignore
|
||||||
|
msg_bytes: bytes = await self.recv_stream.receive_exactly(size)
|
||||||
msg_bytes = await self.recv_stream.receive_exactly(size)
|
|
||||||
|
|
||||||
log.transport(f"received {msg_bytes}") # type: ignore
|
log.transport(f"received {msg_bytes}") # type: ignore
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -161,17 +161,18 @@ class Portal:
|
||||||
self._expect_result = await self.actor.start_remote_task(
|
self._expect_result = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=NamespacePath(f'{ns}:{func}'),
|
nsf=NamespacePath(f'{ns}:{func}'),
|
||||||
kwargs=kwargs
|
kwargs=kwargs,
|
||||||
|
portal=self,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _return_once(
|
async def _return_once(
|
||||||
self,
|
self,
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
|
||||||
) -> dict[str, Any]:
|
) -> Return:
|
||||||
|
|
||||||
assert ctx._remote_func_type == 'asyncfunc' # single response
|
assert ctx._remote_func_type == 'asyncfunc' # single response
|
||||||
msg: dict = await ctx._recv_chan.receive()
|
msg: Return = await ctx._recv_chan.receive()
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
async def result(self) -> Any:
|
async def result(self) -> Any:
|
||||||
|
@ -247,6 +248,8 @@ class Portal:
|
||||||
purpose.
|
purpose.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__runtimeframe__: int = 1 # noqa
|
||||||
|
|
||||||
chan: Channel = self.channel
|
chan: Channel = self.channel
|
||||||
if not chan.connected():
|
if not chan.connected():
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
@ -324,16 +327,18 @@ class Portal:
|
||||||
internals!
|
internals!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__runtimeframe__: int = 1 # noqa
|
||||||
nsf = NamespacePath(
|
nsf = NamespacePath(
|
||||||
f'{namespace_path}:{function_name}'
|
f'{namespace_path}:{function_name}'
|
||||||
)
|
)
|
||||||
ctx = await self.actor.start_remote_task(
|
ctx: Context = await self.actor.start_remote_task(
|
||||||
chan=self.channel,
|
chan=self.channel,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
|
portal=self,
|
||||||
)
|
)
|
||||||
ctx._portal = self
|
ctx._portal: Portal = self
|
||||||
msg = await self._return_once(ctx)
|
msg: Return = await self._return_once(ctx)
|
||||||
return _unwrap_msg(
|
return _unwrap_msg(
|
||||||
msg,
|
msg,
|
||||||
self.channel,
|
self.channel,
|
||||||
|
@ -384,6 +389,7 @@ class Portal:
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
|
portal=self,
|
||||||
)
|
)
|
||||||
ctx._portal = self
|
ctx._portal = self
|
||||||
return _unwrap_msg(
|
return _unwrap_msg(
|
||||||
|
@ -398,6 +404,14 @@ class Portal:
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> AsyncGenerator[MsgStream, None]:
|
) -> AsyncGenerator[MsgStream, None]:
|
||||||
|
'''
|
||||||
|
Legacy one-way streaming API.
|
||||||
|
|
||||||
|
TODO: re-impl on top `Portal.open_context()` + an async gen
|
||||||
|
around `Context.open_stream()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
__runtimeframe__: int = 1 # noqa
|
||||||
|
|
||||||
if not inspect.isasyncgenfunction(async_gen_func):
|
if not inspect.isasyncgenfunction(async_gen_func):
|
||||||
if not (
|
if not (
|
||||||
|
@ -411,6 +425,7 @@ class Portal:
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=NamespacePath.from_ref(async_gen_func),
|
nsf=NamespacePath.from_ref(async_gen_func),
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
|
portal=self,
|
||||||
)
|
)
|
||||||
ctx._portal = self
|
ctx._portal = self
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ async def open_root_actor(
|
||||||
|
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
# on our debugger lock state.
|
# on our debugger lock state.
|
||||||
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||||
|
|
||||||
# mark top most level process as root actor
|
# mark top most level process as root actor
|
||||||
_state._runtime_vars['_is_root'] = True
|
_state._runtime_vars['_is_root'] = True
|
||||||
|
|
|
@ -814,7 +814,7 @@ async def process_messages(
|
||||||
# should use it?
|
# should use it?
|
||||||
# https://github.com/python-trio/trio/issues/467
|
# https://github.com/python-trio/trio/issues/467
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Entering IPC msg loop:\n'
|
'Entering RPC msg loop:\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
)
|
)
|
||||||
|
@ -876,7 +876,7 @@ async def process_messages(
|
||||||
# XXX NOTE XXX don't start entire actor
|
# XXX NOTE XXX don't start entire actor
|
||||||
# runtime cancellation if this actor is
|
# runtime cancellation if this actor is
|
||||||
# currently in debug mode!
|
# currently in debug mode!
|
||||||
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
|
pdb_complete: trio.Event|None = _debug.DebugStatus.repl_release
|
||||||
if pdb_complete:
|
if pdb_complete:
|
||||||
await pdb_complete.wait()
|
await pdb_complete.wait()
|
||||||
|
|
||||||
|
@ -1073,7 +1073,7 @@ async def process_messages(
|
||||||
log.exception(message)
|
log.exception(message)
|
||||||
raise RuntimeError(message)
|
raise RuntimeError(message)
|
||||||
|
|
||||||
log.runtime(
|
log.transport(
|
||||||
'Waiting on next IPC msg from\n'
|
'Waiting on next IPC msg from\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
|
|
|
@ -267,10 +267,13 @@ class Actor:
|
||||||
self._listeners: list[trio.abc.Listener] = []
|
self._listeners: list[trio.abc.Listener] = []
|
||||||
self._parent_chan: Channel|None = None
|
self._parent_chan: Channel|None = None
|
||||||
self._forkserver_info: tuple|None = None
|
self._forkserver_info: tuple|None = None
|
||||||
|
|
||||||
|
# track each child/sub-actor in it's locally
|
||||||
|
# supervising nursery
|
||||||
self._actoruid2nursery: dict[
|
self._actoruid2nursery: dict[
|
||||||
tuple[str, str],
|
tuple[str, str], # sub-`Actor.uid`
|
||||||
ActorNursery|None,
|
ActorNursery|None,
|
||||||
] = {} # type: ignore # noqa
|
] = {}
|
||||||
|
|
||||||
# when provided, init the registry addresses property from
|
# when provided, init the registry addresses property from
|
||||||
# input via the validator.
|
# input via the validator.
|
||||||
|
@ -659,12 +662,18 @@ class Actor:
|
||||||
|
|
||||||
# TODO: NEEEDS TO BE TESTED!
|
# TODO: NEEEDS TO BE TESTED!
|
||||||
# actually, no idea if this ever even enters.. XD
|
# actually, no idea if this ever even enters.. XD
|
||||||
|
#
|
||||||
|
# XXX => YES IT DOES, when i was testing ctl-c
|
||||||
|
# from broken debug TTY locking due to
|
||||||
|
# msg-spec races on application using RunVar...
|
||||||
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
||||||
if (
|
if (
|
||||||
pdb_user_uid
|
pdb_user_uid
|
||||||
and local_nursery
|
and local_nursery
|
||||||
):
|
):
|
||||||
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
|
entry: tuple|None = local_nursery._children.get(
|
||||||
|
tuple(pdb_user_uid)
|
||||||
|
)
|
||||||
if entry:
|
if entry:
|
||||||
proc: trio.Process
|
proc: trio.Process
|
||||||
_, proc, _ = entry
|
_, proc, _ = entry
|
||||||
|
@ -674,10 +683,10 @@ class Actor:
|
||||||
and poll() is None
|
and poll() is None
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Root actor reports no-more-peers, BUT '
|
'Root actor reports no-more-peers, BUT\n'
|
||||||
'a DISCONNECTED child still has the debug '
|
'a DISCONNECTED child still has the debug '
|
||||||
'lock!\n'
|
'lock!\n\n'
|
||||||
f'root uid: {self.uid}\n'
|
# f'root uid: {self.uid}\n'
|
||||||
f'last disconnected child uid: {uid}\n'
|
f'last disconnected child uid: {uid}\n'
|
||||||
f'locking child uid: {pdb_user_uid}\n'
|
f'locking child uid: {pdb_user_uid}\n'
|
||||||
)
|
)
|
||||||
|
@ -703,9 +712,8 @@ class Actor:
|
||||||
# if a now stale local task has the TTY lock still
|
# if a now stale local task has the TTY lock still
|
||||||
# we cancel it to allow servicing other requests for
|
# we cancel it to allow servicing other requests for
|
||||||
# the lock.
|
# the lock.
|
||||||
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
|
|
||||||
if (
|
if (
|
||||||
db_cs
|
(db_cs := pdb_lock.get_locking_task_cs())
|
||||||
and not db_cs.cancel_called
|
and not db_cs.cancel_called
|
||||||
and uid == pdb_user_uid
|
and uid == pdb_user_uid
|
||||||
):
|
):
|
||||||
|
@ -742,7 +750,7 @@ class Actor:
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Ignoring invalid IPC ctx msg!\n\n'
|
'Ignoring invalid IPC ctx msg!\n\n'
|
||||||
f'<= sender: {uid}\n'
|
f'<= sender: {uid}\n\n'
|
||||||
# XXX don't need right since it's always in msg?
|
# XXX don't need right since it's always in msg?
|
||||||
# f'=> cid: {cid}\n\n'
|
# f'=> cid: {cid}\n\n'
|
||||||
|
|
||||||
|
@ -796,7 +804,7 @@ class Actor:
|
||||||
cid,
|
cid,
|
||||||
# side,
|
# side,
|
||||||
)]
|
)]
|
||||||
log.runtime(
|
log.debug(
|
||||||
f'Retreived cached IPC ctx for\n'
|
f'Retreived cached IPC ctx for\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'cid:{cid}\n'
|
f'cid:{cid}\n'
|
||||||
|
@ -835,10 +843,14 @@ class Actor:
|
||||||
nsf: NamespacePath,
|
nsf: NamespacePath,
|
||||||
kwargs: dict,
|
kwargs: dict,
|
||||||
|
|
||||||
|
# determines `Context.side: str`
|
||||||
|
portal: Portal|None = None,
|
||||||
|
|
||||||
# IPC channel config
|
# IPC channel config
|
||||||
msg_buffer_size: int|None = None,
|
msg_buffer_size: int|None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
load_nsf: bool = False,
|
load_nsf: bool = False,
|
||||||
|
ack_timeout: float = 3,
|
||||||
|
|
||||||
) -> Context:
|
) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -863,10 +875,12 @@ class Actor:
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
|
ctx._portal = portal
|
||||||
|
|
||||||
if (
|
if (
|
||||||
'self' in nsf
|
'self' in nsf
|
||||||
or not load_nsf
|
or
|
||||||
|
not load_nsf
|
||||||
):
|
):
|
||||||
ns, _, func = nsf.partition(':')
|
ns, _, func = nsf.partition(':')
|
||||||
else:
|
else:
|
||||||
|
@ -874,42 +888,29 @@ class Actor:
|
||||||
# -[ ] but, how to do `self:<Actor.meth>`??
|
# -[ ] but, how to do `self:<Actor.meth>`??
|
||||||
ns, func = nsf.to_tuple()
|
ns, func = nsf.to_tuple()
|
||||||
|
|
||||||
log.runtime(
|
msg = msgtypes.Start(
|
||||||
'Sending cmd to\n'
|
|
||||||
f'peer: {chan.uid} => \n'
|
|
||||||
'\n'
|
|
||||||
f'=> {ns}.{func}({kwargs})\n'
|
|
||||||
)
|
|
||||||
await chan.send(
|
|
||||||
msgtypes.Start(
|
|
||||||
ns=ns,
|
ns=ns,
|
||||||
func=func,
|
func=func,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
uid=self.uid,
|
uid=self.uid,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
)
|
)
|
||||||
|
log.runtime(
|
||||||
|
'Sending RPC start msg\n\n'
|
||||||
|
f'=> peer: {chan.uid}\n'
|
||||||
|
f' |_ {ns}.{func}({kwargs})\n'
|
||||||
)
|
)
|
||||||
# {'cmd': (
|
await chan.send(msg)
|
||||||
# ns,
|
|
||||||
# func,
|
|
||||||
# kwargs,
|
|
||||||
# self.uid,
|
|
||||||
# cid,
|
|
||||||
# )}
|
|
||||||
# )
|
|
||||||
|
|
||||||
# Wait on first response msg and validate; this should be
|
|
||||||
# immediate.
|
|
||||||
# first_msg: dict = await ctx._recv_chan.receive()
|
|
||||||
# functype: str = first_msg.get('functype')
|
|
||||||
|
|
||||||
|
# NOTE wait on first `StartAck` response msg and validate;
|
||||||
|
# this should be immediate and does not (yet) wait for the
|
||||||
|
# remote child task to sync via `Context.started()`.
|
||||||
|
with trio.fail_after(ack_timeout):
|
||||||
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
||||||
try:
|
try:
|
||||||
functype: str = first_msg.functype
|
functype: str = first_msg.functype
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
raise unpack_error(first_msg, chan)
|
raise unpack_error(first_msg, chan)
|
||||||
# if 'error' in first_msg:
|
|
||||||
# raise unpack_error(first_msg, chan)
|
|
||||||
|
|
||||||
if functype not in (
|
if functype not in (
|
||||||
'asyncfunc',
|
'asyncfunc',
|
||||||
|
@ -917,7 +918,7 @@ class Actor:
|
||||||
'context',
|
'context',
|
||||||
):
|
):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'{first_msg} is an invalid response packet?'
|
f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx._remote_func_type = functype
|
ctx._remote_func_type = functype
|
||||||
|
@ -1162,7 +1163,7 @@ class Actor:
|
||||||
|
|
||||||
# kill any debugger request task to avoid deadlock
|
# kill any debugger request task to avoid deadlock
|
||||||
# with the root actor in this tree
|
# with the root actor in this tree
|
||||||
dbcs = _debug.Lock._debugger_request_cs
|
dbcs = _debug.DebugStatus.req_cs
|
||||||
if dbcs is not None:
|
if dbcs is not None:
|
||||||
msg += (
|
msg += (
|
||||||
'>> Cancelling active debugger request..\n'
|
'>> Cancelling active debugger request..\n'
|
||||||
|
@ -1237,9 +1238,9 @@ class Actor:
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# NOTE: during msging race conditions this will often
|
# NOTE: during msging race conditions this will often
|
||||||
# emit, some examples:
|
# emit, some examples:
|
||||||
# - callee returns a result before cancel-msg/ctxc-raised
|
# - child returns a result before cancel-msg/ctxc-raised
|
||||||
# - callee self raises ctxc before caller send request,
|
# - child self raises ctxc before parent send request,
|
||||||
# - callee errors prior to cancel req.
|
# - child errors prior to cancel req.
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancel request invalid, RPC task already completed?\n\n'
|
'Cancel request invalid, RPC task already completed?\n\n'
|
||||||
f'<= canceller: {requesting_uid}\n\n'
|
f'<= canceller: {requesting_uid}\n\n'
|
||||||
|
@ -1305,12 +1306,12 @@ class Actor:
|
||||||
f'|_{ctx}\n'
|
f'|_{ctx}\n'
|
||||||
)
|
)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Waiting on RPC task to cancel\n'
|
'Waiting on RPC task to cancel\n\n'
|
||||||
f'{flow_info}'
|
f'{flow_info}'
|
||||||
)
|
)
|
||||||
await is_complete.wait()
|
await is_complete.wait()
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Sucessfully cancelled RPC task\n'
|
f'Sucessfully cancelled RPC task\n\n'
|
||||||
f'{flow_info}'
|
f'{flow_info}'
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
@ -1536,8 +1537,8 @@ async def async_main(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
# on our debugger lock state.
|
# on our debugger state.
|
||||||
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||||
|
|
||||||
is_registered: bool = False
|
is_registered: bool = False
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -30,11 +30,16 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
_current_actor: Actor|None = None # type: ignore # noqa
|
_current_actor: Actor|None = None # type: ignore # noqa
|
||||||
_last_actor_terminated: Actor|None = None
|
_last_actor_terminated: Actor|None = None
|
||||||
|
|
||||||
|
# TODO: mk this a `msgspec.Struct`!
|
||||||
_runtime_vars: dict[str, Any] = {
|
_runtime_vars: dict[str, Any] = {
|
||||||
'_debug_mode': False,
|
'_debug_mode': False,
|
||||||
'_is_root': False,
|
'_is_root': False,
|
||||||
'_root_mailbox': (None, None),
|
'_root_mailbox': (None, None),
|
||||||
'_registry_addrs': [],
|
'_registry_addrs': [],
|
||||||
|
|
||||||
|
# for `breakpoint()` support
|
||||||
|
'use_greenback': False,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -61,7 +66,7 @@ def current_actor(
|
||||||
err_on_no_runtime
|
err_on_no_runtime
|
||||||
and _current_actor is None
|
and _current_actor is None
|
||||||
):
|
):
|
||||||
msg: str = 'No local actor has been initialized yet'
|
msg: str = 'No local actor has been initialized yet?\n'
|
||||||
from ._exceptions import NoRuntime
|
from ._exceptions import NoRuntime
|
||||||
|
|
||||||
if last := last_actor():
|
if last := last_actor():
|
||||||
|
@ -74,8 +79,8 @@ def current_actor(
|
||||||
# this process.
|
# this process.
|
||||||
else:
|
else:
|
||||||
msg += (
|
msg += (
|
||||||
'No last actor found?\n'
|
# 'No last actor found?\n'
|
||||||
'Did you forget to open one of:\n\n'
|
'\nDid you forget to call one of,\n'
|
||||||
'- `tractor.open_root_actor()`\n'
|
'- `tractor.open_root_actor()`\n'
|
||||||
'- `tractor.open_nursery()`\n'
|
'- `tractor.open_nursery()`\n'
|
||||||
)
|
)
|
||||||
|
|
|
@ -377,14 +377,17 @@ class MsgStream(trio.abc.Channel):
|
||||||
# await rx_chan.aclose()
|
# await rx_chan.aclose()
|
||||||
|
|
||||||
if not self._eoc:
|
if not self._eoc:
|
||||||
log.cancel(
|
message: str = (
|
||||||
'Stream closed by self before it received an EoC?\n'
|
f'Context stream closed by {self._ctx.side!r}\n'
|
||||||
'Setting eoc manually..\n..'
|
|
||||||
)
|
|
||||||
self._eoc: bool = trio.EndOfChannel(
|
|
||||||
f'Context stream closed by self({self._ctx.side})\n'
|
|
||||||
f'|_{self}\n'
|
f'|_{self}\n'
|
||||||
)
|
)
|
||||||
|
log.cancel(
|
||||||
|
'Stream self-closed before receiving EoC\n\n'
|
||||||
|
+
|
||||||
|
message
|
||||||
|
)
|
||||||
|
self._eoc = trio.EndOfChannel(message)
|
||||||
|
|
||||||
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
||||||
# => NO, DEFINITELY NOT! <=
|
# => NO, DEFINITELY NOT! <=
|
||||||
# if we're a bi-dir ``MsgStream`` BECAUSE this same
|
# if we're a bi-dir ``MsgStream`` BECAUSE this same
|
||||||
|
|
|
@ -131,7 +131,12 @@ class ActorNursery:
|
||||||
"main task" besides the runtime.
|
"main task" besides the runtime.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
__runtimeframe__: int = 1 # noqa
|
||||||
|
loglevel: str = (
|
||||||
|
loglevel
|
||||||
|
or self._actor.loglevel
|
||||||
|
or get_loglevel()
|
||||||
|
)
|
||||||
|
|
||||||
# configure and pass runtime state
|
# configure and pass runtime state
|
||||||
_rtv = _state._runtime_vars.copy()
|
_rtv = _state._runtime_vars.copy()
|
||||||
|
@ -209,6 +214,7 @@ class ActorNursery:
|
||||||
the actor is terminated.
|
the actor is terminated.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__runtimeframe__: int = 1 # noqa
|
||||||
mod_path: str = fn.__module__
|
mod_path: str = fn.__module__
|
||||||
|
|
||||||
if name is None:
|
if name is None:
|
||||||
|
@ -257,6 +263,7 @@ class ActorNursery:
|
||||||
directly without any far end graceful ``trio`` cancellation.
|
directly without any far end graceful ``trio`` cancellation.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__runtimeframe__: int = 1 # noqa
|
||||||
self.cancelled = True
|
self.cancelled = True
|
||||||
|
|
||||||
# TODO: impl a repr for spawn more compact
|
# TODO: impl a repr for spawn more compact
|
||||||
|
|
|
@ -27,7 +27,6 @@ from ._debug import (
|
||||||
pause as pause,
|
pause as pause,
|
||||||
pause_from_sync as pause_from_sync,
|
pause_from_sync as pause_from_sync,
|
||||||
shield_sigint_handler as shield_sigint_handler,
|
shield_sigint_handler as shield_sigint_handler,
|
||||||
MultiActorPdb as MultiActorPdb,
|
|
||||||
open_crash_handler as open_crash_handler,
|
open_crash_handler as open_crash_handler,
|
||||||
maybe_open_crash_handler as maybe_open_crash_handler,
|
maybe_open_crash_handler as maybe_open_crash_handler,
|
||||||
post_mortem as post_mortem,
|
post_mortem as post_mortem,
|
||||||
|
|
|
@ -0,0 +1,177 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Tools for code-object annotation, introspection and mutation
|
||||||
|
as it pertains to improving the grok-ability of our runtime!
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
import inspect
|
||||||
|
# import msgspec
|
||||||
|
# from pprint import pformat
|
||||||
|
from types import (
|
||||||
|
FrameType,
|
||||||
|
FunctionType,
|
||||||
|
MethodType,
|
||||||
|
# CodeType,
|
||||||
|
)
|
||||||
|
from typing import (
|
||||||
|
# Any,
|
||||||
|
Callable,
|
||||||
|
# TYPE_CHECKING,
|
||||||
|
Type,
|
||||||
|
)
|
||||||
|
|
||||||
|
from tractor.msg import (
|
||||||
|
pretty_struct,
|
||||||
|
NamespacePath,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: yeah, i don't love this and we should prolly just
|
||||||
|
# write a decorator that actually keeps a stupid ref to the func
|
||||||
|
# obj..
|
||||||
|
def get_class_from_frame(fr: FrameType) -> (
|
||||||
|
FunctionType
|
||||||
|
|MethodType
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Attempt to get the function (or method) reference
|
||||||
|
from a given `FrameType`.
|
||||||
|
|
||||||
|
Verbatim from an SO:
|
||||||
|
https://stackoverflow.com/a/2220759
|
||||||
|
|
||||||
|
'''
|
||||||
|
args, _, _, value_dict = inspect.getargvalues(fr)
|
||||||
|
|
||||||
|
# we check the first parameter for the frame function is
|
||||||
|
# named 'self'
|
||||||
|
if (
|
||||||
|
len(args)
|
||||||
|
and
|
||||||
|
# TODO: other cases for `@classmethod` etc..?)
|
||||||
|
args[0] == 'self'
|
||||||
|
):
|
||||||
|
# in that case, 'self' will be referenced in value_dict
|
||||||
|
instance: object = value_dict.get('self')
|
||||||
|
if instance:
|
||||||
|
# return its class
|
||||||
|
return getattr(
|
||||||
|
instance,
|
||||||
|
'__class__',
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# return None otherwise
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def func_ref_from_frame(
|
||||||
|
frame: FrameType,
|
||||||
|
) -> Callable:
|
||||||
|
func_name: str = frame.f_code.co_name
|
||||||
|
try:
|
||||||
|
return frame.f_globals[func_name]
|
||||||
|
except KeyError:
|
||||||
|
cls: Type|None = get_class_from_frame(frame)
|
||||||
|
if cls:
|
||||||
|
return getattr(
|
||||||
|
cls,
|
||||||
|
func_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: move all this into new `.devx._code`!
|
||||||
|
# -[ ] prolly create a `@runtime_api` dec?
|
||||||
|
# -[ ] ^- make it capture and/or accept buncha optional
|
||||||
|
# meta-data like a fancier version of `@pdbp.hideframe`.
|
||||||
|
#
|
||||||
|
class CallerInfo(pretty_struct.Struct):
|
||||||
|
rt_fi: inspect.FrameInfo
|
||||||
|
call_frame: FrameType
|
||||||
|
|
||||||
|
@property
|
||||||
|
def api_func_ref(self) -> Callable|None:
|
||||||
|
return func_ref_from_frame(self.rt_fi.frame)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def api_nsp(self) -> NamespacePath|None:
|
||||||
|
func: FunctionType = self.api_func_ref
|
||||||
|
if func:
|
||||||
|
return NamespacePath.from_ref(func)
|
||||||
|
|
||||||
|
return '<unknown>'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def caller_func_ref(self) -> Callable|None:
|
||||||
|
return func_ref_from_frame(self.call_frame)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def caller_nsp(self) -> NamespacePath|None:
|
||||||
|
func: FunctionType = self.caller_func_ref
|
||||||
|
if func:
|
||||||
|
return NamespacePath.from_ref(func)
|
||||||
|
|
||||||
|
return '<unknown>'
|
||||||
|
|
||||||
|
|
||||||
|
def find_caller_info(
|
||||||
|
dunder_var: str = '__runtimeframe__',
|
||||||
|
iframes:int = 1,
|
||||||
|
check_frame_depth: bool = True,
|
||||||
|
|
||||||
|
) -> CallerInfo|None:
|
||||||
|
'''
|
||||||
|
Scan up the callstack for a frame with a `dunder_var: str` variable
|
||||||
|
and return the `iframes` frames above it.
|
||||||
|
|
||||||
|
By default we scan for a `__runtimeframe__` scope var which
|
||||||
|
denotes a `tractor` API above which (one frame up) is "user
|
||||||
|
app code" which "called into" the `tractor` method or func.
|
||||||
|
|
||||||
|
TODO: ex with `Portal.open_context()`
|
||||||
|
|
||||||
|
'''
|
||||||
|
# TODO: use this instead?
|
||||||
|
# https://docs.python.org/3/library/inspect.html#inspect.getouterframes
|
||||||
|
frames: list[inspect.FrameInfo] = inspect.stack()
|
||||||
|
for fi in frames:
|
||||||
|
assert (
|
||||||
|
fi.function
|
||||||
|
==
|
||||||
|
fi.frame.f_code.co_name
|
||||||
|
)
|
||||||
|
this_frame: FrameType = fi.frame
|
||||||
|
dunder_val: int|None = this_frame.f_locals.get(dunder_var)
|
||||||
|
if dunder_val:
|
||||||
|
go_up_iframes: int = (
|
||||||
|
dunder_val # could be 0 or `True` i guess?
|
||||||
|
or
|
||||||
|
iframes
|
||||||
|
)
|
||||||
|
rt_frame: FrameType = fi.frame
|
||||||
|
call_frame = rt_frame
|
||||||
|
for i in range(go_up_iframes):
|
||||||
|
call_frame = call_frame.f_back
|
||||||
|
|
||||||
|
return CallerInfo(
|
||||||
|
rt_fi=fi,
|
||||||
|
call_frame=call_frame,
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
File diff suppressed because it is too large
Load Diff
|
@ -23,12 +23,31 @@ into each ``trio.Nursery`` except it links the lifetimes of memory space
|
||||||
disjoint, parallel executing tasks in separate actors.
|
disjoint, parallel executing tasks in separate actors.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
import multiprocessing as mp
|
||||||
from signal import (
|
from signal import (
|
||||||
signal,
|
signal,
|
||||||
SIGUSR1,
|
SIGUSR1,
|
||||||
)
|
)
|
||||||
|
import traceback
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
from tractor import (
|
||||||
|
_state,
|
||||||
|
log as logmod,
|
||||||
|
)
|
||||||
|
|
||||||
|
log = logmod.get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from tractor._spawn import ProcessType
|
||||||
|
from tractor import (
|
||||||
|
Actor,
|
||||||
|
ActorNursery,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@trio.lowlevel.disable_ki_protection
|
@trio.lowlevel.disable_ki_protection
|
||||||
def dump_task_tree() -> None:
|
def dump_task_tree() -> None:
|
||||||
|
@ -41,9 +60,15 @@ def dump_task_tree() -> None:
|
||||||
recurse_child_tasks=True
|
recurse_child_tasks=True
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log = get_console_log('cancel')
|
log = get_console_log(
|
||||||
|
name=__name__,
|
||||||
|
level='cancel',
|
||||||
|
)
|
||||||
|
actor: Actor = _state.current_actor()
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'Dumping `stackscope` tree:\n\n'
|
f'Dumping `stackscope` tree for actor\n'
|
||||||
|
f'{actor.name}: {actor}\n'
|
||||||
|
f' |_{mp.current_process()}\n\n'
|
||||||
f'{tree_str}\n'
|
f'{tree_str}\n'
|
||||||
)
|
)
|
||||||
# import logging
|
# import logging
|
||||||
|
@ -56,8 +81,13 @@ def dump_task_tree() -> None:
|
||||||
# ).exception("Error printing task tree")
|
# ).exception("Error printing task tree")
|
||||||
|
|
||||||
|
|
||||||
def signal_handler(sig: int, frame: object) -> None:
|
def signal_handler(
|
||||||
import traceback
|
sig: int,
|
||||||
|
frame: object,
|
||||||
|
|
||||||
|
relay_to_subs: bool = True,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
try:
|
try:
|
||||||
trio.lowlevel.current_trio_token(
|
trio.lowlevel.current_trio_token(
|
||||||
).run_sync_soon(dump_task_tree)
|
).run_sync_soon(dump_task_tree)
|
||||||
|
@ -65,6 +95,26 @@ def signal_handler(sig: int, frame: object) -> None:
|
||||||
# not in async context -- print a normal traceback
|
# not in async context -- print a normal traceback
|
||||||
traceback.print_stack()
|
traceback.print_stack()
|
||||||
|
|
||||||
|
if not relay_to_subs:
|
||||||
|
return
|
||||||
|
|
||||||
|
an: ActorNursery
|
||||||
|
for an in _state.current_actor()._actoruid2nursery.values():
|
||||||
|
|
||||||
|
subproc: ProcessType
|
||||||
|
subactor: Actor
|
||||||
|
for subactor, subproc, _ in an._children.values():
|
||||||
|
log.pdb(
|
||||||
|
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
|
||||||
|
f'{subactor}\n'
|
||||||
|
f' |_{subproc}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
if isinstance(subproc, trio.Process):
|
||||||
|
subproc.send_signal(sig)
|
||||||
|
|
||||||
|
elif isinstance(subproc, mp.Process):
|
||||||
|
subproc._send_signal(sig)
|
||||||
|
|
||||||
|
|
||||||
def enable_stack_on_sig(
|
def enable_stack_on_sig(
|
||||||
|
@ -82,3 +132,6 @@ def enable_stack_on_sig(
|
||||||
# NOTE: not the above can be triggered from
|
# NOTE: not the above can be triggered from
|
||||||
# a (xonsh) shell using:
|
# a (xonsh) shell using:
|
||||||
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
||||||
|
#
|
||||||
|
# for example if you were looking to trace a `pytest` run
|
||||||
|
# kill -SIGUSR1 @$(pgrep -f 'pytest')
|
||||||
|
|
|
@ -33,25 +33,29 @@ from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
)
|
)
|
||||||
# from contextvars import (
|
from contextvars import (
|
||||||
# ContextVar,
|
ContextVar,
|
||||||
# Token,
|
Token,
|
||||||
# )
|
)
|
||||||
import textwrap
|
import textwrap
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
Type,
|
Type,
|
||||||
|
TYPE_CHECKING,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
|
||||||
import msgspec
|
import msgspec
|
||||||
from msgspec import msgpack
|
from msgspec import (
|
||||||
from trio.lowlevel import (
|
msgpack,
|
||||||
RunVar,
|
Raw,
|
||||||
RunVarToken,
|
|
||||||
)
|
)
|
||||||
|
# from trio.lowlevel import (
|
||||||
|
# RunVar,
|
||||||
|
# RunVarToken,
|
||||||
|
# )
|
||||||
# TODO: see notes below from @mikenerone..
|
# TODO: see notes below from @mikenerone..
|
||||||
# from tricycle import TreeVar
|
# from tricycle import TreeVar
|
||||||
|
|
||||||
|
@ -62,6 +66,9 @@ from tractor.msg.types import (
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from tractor._context import Context
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
||||||
|
@ -157,24 +164,6 @@ class MsgCodec(Struct):
|
||||||
|
|
||||||
lib: ModuleType = msgspec
|
lib: ModuleType = msgspec
|
||||||
|
|
||||||
# TODO: a sub-decoder system as well?
|
|
||||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
|
||||||
# see related comments in `.msg.types`
|
|
||||||
# _payload_decs: (
|
|
||||||
# dict[
|
|
||||||
# str,
|
|
||||||
# msgpack.Decoder,
|
|
||||||
# ]
|
|
||||||
# |None
|
|
||||||
# ) = None
|
|
||||||
# OR
|
|
||||||
# ) = {
|
|
||||||
# # pre-seed decoders for std-py-type-set for use when
|
|
||||||
# # `MsgType.pld == None|Any`.
|
|
||||||
# None: msgpack.Decoder(Any),
|
|
||||||
# Any: msgpack.Decoder(Any),
|
|
||||||
# }
|
|
||||||
|
|
||||||
# TODO: use `functools.cached_property` for these ?
|
# TODO: use `functools.cached_property` for these ?
|
||||||
# https://docs.python.org/3/library/functools.html#functools.cached_property
|
# https://docs.python.org/3/library/functools.html#functools.cached_property
|
||||||
@property
|
@property
|
||||||
|
@ -210,7 +199,25 @@ class MsgCodec(Struct):
|
||||||
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
||||||
return self._dec.decode(msg)
|
return self._dec.decode(msg)
|
||||||
|
|
||||||
# TODO: do we still want to try and support the sub-decoder with
|
# TODO: a sub-decoder system as well?
|
||||||
|
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||||
|
# see related comments in `.msg.types`
|
||||||
|
# _payload_decs: (
|
||||||
|
# dict[
|
||||||
|
# str,
|
||||||
|
# msgpack.Decoder,
|
||||||
|
# ]
|
||||||
|
# |None
|
||||||
|
# ) = None
|
||||||
|
# OR
|
||||||
|
# ) = {
|
||||||
|
# # pre-seed decoders for std-py-type-set for use when
|
||||||
|
# # `MsgType.pld == None|Any`.
|
||||||
|
# None: msgpack.Decoder(Any),
|
||||||
|
# Any: msgpack.Decoder(Any),
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# -[ ] do we still want to try and support the sub-decoder with
|
||||||
# `.Raw` technique in the case that the `Generic` approach gives
|
# `.Raw` technique in the case that the `Generic` approach gives
|
||||||
# future grief?
|
# future grief?
|
||||||
#
|
#
|
||||||
|
@ -429,6 +436,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
|
||||||
#
|
#
|
||||||
_def_tractor_codec: MsgCodec = mk_codec(
|
_def_tractor_codec: MsgCodec = mk_codec(
|
||||||
ipc_pld_spec=Any,
|
ipc_pld_spec=Any,
|
||||||
|
|
||||||
|
# TODO: use this for debug mode locking prot?
|
||||||
|
# ipc_pld_spec=Raw,
|
||||||
)
|
)
|
||||||
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
|
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
|
||||||
# IPC msging codec used by the transport layer when doing
|
# IPC msging codec used by the transport layer when doing
|
||||||
|
@ -462,11 +472,9 @@ _def_tractor_codec: MsgCodec = mk_codec(
|
||||||
|
|
||||||
# TODO: STOP USING THIS, since it's basically a global and won't
|
# TODO: STOP USING THIS, since it's basically a global and won't
|
||||||
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
||||||
_ctxvar_MsgCodec: MsgCodec = RunVar(
|
# _ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||||
|
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
||||||
'msgspec_codec',
|
'msgspec_codec',
|
||||||
|
|
||||||
# TODO: move this to our new `Msg`-spec!
|
|
||||||
# default=_def_msgspec_codec,
|
|
||||||
default=_def_tractor_codec,
|
default=_def_tractor_codec,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -475,23 +483,36 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||||
def apply_codec(
|
def apply_codec(
|
||||||
codec: MsgCodec,
|
codec: MsgCodec,
|
||||||
|
|
||||||
|
ctx: Context|None = None,
|
||||||
|
|
||||||
) -> MsgCodec:
|
) -> MsgCodec:
|
||||||
'''
|
'''
|
||||||
Dynamically apply a `MsgCodec` to the current task's
|
Dynamically apply a `MsgCodec` to the current task's runtime
|
||||||
runtime context such that all IPC msgs are processed
|
context such that all (of a certain class of payload
|
||||||
with it for that task.
|
containing i.e. `MsgType.pld: PayloadT`) IPC msgs are
|
||||||
|
processed with it for that task.
|
||||||
|
|
||||||
|
Uses a `contextvars.ContextVar` to ensure the scope of any
|
||||||
|
codec setting matches the current `Context` or
|
||||||
|
`._rpc.process_messages()` feeder task's prior setting without
|
||||||
|
mutating any surrounding scope.
|
||||||
|
|
||||||
|
When a `ctx` is supplied, only mod its `Context.pld_codec`.
|
||||||
|
|
||||||
Uses a `tricycle.TreeVar` to ensure the scope of the codec
|
|
||||||
matches the `@cm` block and DOES NOT change to the original
|
matches the `@cm` block and DOES NOT change to the original
|
||||||
(default) value in new tasks (as it does for `ContextVar`).
|
(default) value in new tasks (as it does for `ContextVar`).
|
||||||
|
|
||||||
See the docs:
|
|
||||||
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
|
||||||
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
orig: MsgCodec = _ctxvar_MsgCodec.get()
|
|
||||||
|
if ctx is not None:
|
||||||
|
var: ContextVar = ctx._var_pld_codec
|
||||||
|
else:
|
||||||
|
# use IPC channel-connection "global" codec
|
||||||
|
var: ContextVar = _ctxvar_MsgCodec
|
||||||
|
|
||||||
|
orig: MsgCodec = var.get()
|
||||||
|
|
||||||
assert orig is not codec
|
assert orig is not codec
|
||||||
if codec.pld_spec is None:
|
if codec.pld_spec is None:
|
||||||
breakpoint()
|
breakpoint()
|
||||||
|
@ -500,22 +521,25 @@ def apply_codec(
|
||||||
'Applying new msg-spec codec\n\n'
|
'Applying new msg-spec codec\n\n'
|
||||||
f'{codec}\n'
|
f'{codec}\n'
|
||||||
)
|
)
|
||||||
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
|
token: Token = var.set(codec)
|
||||||
|
|
||||||
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
|
# ?TODO? for TreeVar approach which copies from the
|
||||||
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
# cancel-scope of the prior value, NOT the prior task
|
||||||
# try:
|
# See the docs:
|
||||||
|
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||||
|
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
||||||
|
# ^- see docs for @cm `.being()` API
|
||||||
# with _ctxvar_MsgCodec.being(codec):
|
# with _ctxvar_MsgCodec.being(codec):
|
||||||
# new = _ctxvar_MsgCodec.get()
|
# new = _ctxvar_MsgCodec.get()
|
||||||
# assert new is codec
|
# assert new is codec
|
||||||
# yield codec
|
# yield codec
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield _ctxvar_MsgCodec.get()
|
yield var.get()
|
||||||
finally:
|
finally:
|
||||||
_ctxvar_MsgCodec.reset(token)
|
var.reset(token)
|
||||||
|
|
||||||
assert _ctxvar_MsgCodec.get() is orig
|
assert var.get() is orig
|
||||||
log.info(
|
log.info(
|
||||||
'Reverted to last msg-spec codec\n\n'
|
'Reverted to last msg-spec codec\n\n'
|
||||||
f'{orig}\n'
|
f'{orig}\n'
|
||||||
|
|
|
@ -76,9 +76,11 @@ class NamespacePath(str):
|
||||||
return self._ref
|
return self._ref
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _mk_fqnp(ref: type | object) -> tuple[str, str]:
|
def _mk_fqnp(
|
||||||
|
ref: type|object,
|
||||||
|
) -> tuple[str, str]:
|
||||||
'''
|
'''
|
||||||
Generate a minial ``str`` pair which describes a python
|
Generate a minial `str` pair which describes a python
|
||||||
object's namespace path and object/type name.
|
object's namespace path and object/type name.
|
||||||
|
|
||||||
In more precise terms something like:
|
In more precise terms something like:
|
||||||
|
@ -87,10 +89,9 @@ class NamespacePath(str):
|
||||||
of THIS type XD
|
of THIS type XD
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if (
|
if isfunction(ref):
|
||||||
isfunction(ref)
|
|
||||||
):
|
|
||||||
name: str = getattr(ref, '__name__')
|
name: str = getattr(ref, '__name__')
|
||||||
|
mod_name: str = ref.__module__
|
||||||
|
|
||||||
elif ismethod(ref):
|
elif ismethod(ref):
|
||||||
# build out the path manually i guess..?
|
# build out the path manually i guess..?
|
||||||
|
@ -99,15 +100,19 @@ class NamespacePath(str):
|
||||||
type(ref.__self__).__name__,
|
type(ref.__self__).__name__,
|
||||||
ref.__func__.__name__,
|
ref.__func__.__name__,
|
||||||
])
|
])
|
||||||
|
mod_name: str = ref.__self__.__module__
|
||||||
|
|
||||||
else: # object or other?
|
else: # object or other?
|
||||||
# isinstance(ref, object)
|
# isinstance(ref, object)
|
||||||
# and not isfunction(ref)
|
# and not isfunction(ref)
|
||||||
name: str = type(ref).__name__
|
name: str = type(ref).__name__
|
||||||
|
mod_name: str = ref.__module__
|
||||||
|
|
||||||
|
# TODO: return static value direactly?
|
||||||
|
#
|
||||||
# fully qualified namespace path, tuple.
|
# fully qualified namespace path, tuple.
|
||||||
fqnp: tuple[str, str] = (
|
fqnp: tuple[str, str] = (
|
||||||
ref.__module__,
|
mod_name,
|
||||||
name,
|
name,
|
||||||
)
|
)
|
||||||
return fqnp
|
return fqnp
|
||||||
|
|
Loading…
Reference in New Issue