Compare commits

..

No commits in common. "9b4f5804706b2c7effcf9c39d579c3c2bdb41e25" and "b209990d045c9069b11279916dc529bbe3152b0b" have entirely different histories.

17 changed files with 560 additions and 1445 deletions

View File

@ -14,20 +14,19 @@ from typing import (
from contextvars import ( from contextvars import (
Context, Context,
) )
# from inspect import Parameter
from msgspec import ( from msgspec import (
structs, structs,
msgpack, msgpack,
# defstruct,
Struct, Struct,
ValidationError, ValidationError,
) )
import pytest import pytest
import tractor import tractor
from tractor import ( from tractor import _state
_state,
MsgTypeError,
)
from tractor.msg import ( from tractor.msg import (
_codec, _codec,
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
@ -48,6 +47,21 @@ from tractor.msg.types import (
import trio import trio
def test_msg_spec_xor_pld_spec():
'''
If the `.msg.types.Msg`-set is overridden, we
can't also support a `Msg.pld` spec.
'''
# apply custom hooks and set a `Decoder` which only
# loads `NamespacePath` types.
with pytest.raises(RuntimeError):
mk_codec(
ipc_msg_spec=Any,
ipc_pld_spec=NamespacePath,
)
def mk_custom_codec( def mk_custom_codec(
pld_spec: Union[Type]|Any, pld_spec: Union[Type]|Any,
add_hooks: bool, add_hooks: bool,
@ -120,9 +134,7 @@ def mk_custom_codec(
f'{uid}\n' f'{uid}\n'
'FAILED DECODE\n' 'FAILED DECODE\n'
f'type-> {obj_type}\n' f'type-> {obj_type}\n'
f'obj-arg-> `{obj}`: {type(obj)}\n\n' f'obj-arg-> `{obj}`: {type(obj)}\n'
f'current codec:\n'
f'{current_codec()}\n'
) )
# TODO: figure out the ignore subsys for this! # TODO: figure out the ignore subsys for this!
# -[ ] option whether to defense-relay backc the msg # -[ ] option whether to defense-relay backc the msg
@ -397,9 +409,7 @@ async def send_back_values(
pld_spec=ipc_pld_spec, pld_spec=ipc_pld_spec,
add_hooks=add_hooks, add_hooks=add_hooks,
) )
with ( with apply_codec(nsp_codec) as codec:
apply_codec(nsp_codec) as codec,
):
chk_codec_applied( chk_codec_applied(
expect_codec=nsp_codec, expect_codec=nsp_codec,
enter_value=codec, enter_value=codec,
@ -449,7 +459,7 @@ async def send_back_values(
# XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL # XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL
# `str` handling! or special debug mode IPC # `str` handling! or special debug mode IPC
# msgs! # msgs!
await tractor.pause() # await tractor.pause()
raise RuntimeError( raise RuntimeError(
f'NOT-EXPECTED able to roundtrip value given spec:\n' f'NOT-EXPECTED able to roundtrip value given spec:\n'
@ -460,8 +470,7 @@ async def send_back_values(
break # move on to streaming block.. break # move on to streaming block..
except tractor.MsgTypeError: except tractor.MsgTypeError:
await tractor.pause() # await tractor.pause()
if expect_send: if expect_send:
raise RuntimeError( raise RuntimeError(
f'EXPECTED to `.started()` value given spec:\n' f'EXPECTED to `.started()` value given spec:\n'
@ -643,42 +652,12 @@ def test_codec_hooks_mod(
pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec) pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
# XXX should raise an mte (`MsgTypeError`)
# when `add_codec_hooks == False` bc the input
# `expect_ipc_send` kwarg has a nsp which can't be
# serialized!
#
# TODO:can we ensure this happens from the
# `Return`-side (aka the sub) as well?
if not add_codec_hooks:
try:
async with p.open_context(
send_back_values,
expect_debug=debug_mode,
pld_spec_type_strs=pld_spec_type_strs,
add_hooks=add_codec_hooks,
started_msg_bytes=nsp_codec.encode(expected_started),
# XXX NOTE bc we send a `NamespacePath` in this kwarg
expect_ipc_send=expect_ipc_send,
) as (ctx, first):
pytest.fail('ctx should fail to open without custom enc_hook!?')
# this test passes bc we can go no further!
except MsgTypeError:
# teardown nursery
await p.cancel_actor()
return
# TODO: send the original nsp here and # TODO: send the original nsp here and
# test with `limit_msg_spec()` above? # test with `limit_msg_spec()` above?
# await tractor.pause() # await tractor.pause()
print('PARENT opening IPC ctx!\n') print('PARENT opening IPC ctx!\n')
async with ( async with (
# XXX should raise an mte (`MsgTypeError`)
# when `add_codec_hooks == False`..
p.open_context( p.open_context(
send_back_values, send_back_values,
expect_debug=debug_mode, expect_debug=debug_mode,

View File

@ -26,7 +26,6 @@ disjoint, parallel executing tasks in separate actors.
from __future__ import annotations from __future__ import annotations
from collections import deque from collections import deque
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from contextvars import ContextVar
from dataclasses import ( from dataclasses import (
dataclass, dataclass,
field, field,
@ -57,7 +56,6 @@ from ._exceptions import (
) )
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
_codec,
Error, Error,
MsgType, MsgType,
MsgCodec, MsgCodec,
@ -82,9 +80,6 @@ if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
from ._runtime import Actor from ._runtime import Actor
from ._ipc import MsgTransport from ._ipc import MsgTransport
from .devx._code import (
CallerInfo,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -504,18 +499,6 @@ class Context:
_started_called: bool = False _started_called: bool = False
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None _stream: MsgStream|None = None
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
'pld_codec',
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
)
@property
def pld_codec(self) -> MsgCodec|None:
return self._pld_codec_var.get()
# caller of `Portal.open_context()` for
# logging purposes mostly
_caller_info: CallerInfo|None = None
# overrun handling machinery # overrun handling machinery
# NOTE: none of this provides "backpressure" to the remote # NOTE: none of this provides "backpressure" to the remote
@ -542,7 +525,6 @@ class Context:
# TODO: figure out how we can enforce this without losing our minds.. # TODO: figure out how we can enforce this without losing our minds..
_strict_started: bool = False _strict_started: bool = False
_cancel_on_msgerr: bool = True
def __str__(self) -> str: def __str__(self) -> str:
ds: str = '=' ds: str = '='
@ -875,7 +857,6 @@ class Context:
# TODO: never do this right? # TODO: never do this right?
# if self._remote_error: # if self._remote_error:
# return # return
peer_side: str = self.peer_side(self.side)
# XXX: denote and set the remote side's error so that # XXX: denote and set the remote side's error so that
# after we cancel whatever task is the opener of this # after we cancel whatever task is the opener of this
@ -883,15 +864,14 @@ class Context:
# appropriately. # appropriately.
log.runtime( log.runtime(
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= {peer_side!r}: {self.chan.uid}\n' f'<= remote ctx uid: {self.chan.uid}\n'
f'=> {self.side!r}\n\n' f'=>{error}'
f'{error}'
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
# self-cancel (ack) or, # self-cancel (ack) or,
# peer propagated remote cancellation. # peer propagated remote cancellation.
msgerr: bool = False msgtyperr: bool = False
if isinstance(error, ContextCancelled): if isinstance(error, ContextCancelled):
whom: str = ( whom: str = (
@ -904,7 +884,7 @@ class Context:
) )
elif isinstance(error, MsgTypeError): elif isinstance(error, MsgTypeError):
msgerr = True msgtyperr = True
peer_side: str = self.peer_side(self.side) peer_side: str = self.peer_side(self.side)
log.error( log.error(
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n' f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
@ -955,24 +935,13 @@ class Context:
and not self._is_self_cancelled() and not self._is_self_cancelled()
and not cs.cancel_called and not cs.cancel_called
and not cs.cancelled_caught and not cs.cancelled_caught
and ( and not msgtyperr
msgerr
and
# NOTE: allow user to config not cancelling the
# local scope on `MsgTypeError`s
self._cancel_on_msgerr
)
): ):
# TODO: it'd sure be handy to inject our own # TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
log.cancel('Cancelling local `.open_context()` scope!')
self._scope.cancel() self._scope.cancel()
else:
log.cancel('NOT cancelling local `.open_context()` scope!')
# TODO: maybe we should also call `._res_scope.cancel()` if it # TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs? # exists to support cancelling any drain loop hangs?
@ -997,7 +966,9 @@ class Context:
dmaddr = dst_maddr dmaddr = dst_maddr
@property @property
def repr_rpc(self) -> str: def repr_rpc(
self,
) -> str:
# TODO: how to show the transport interchange fmt? # TODO: how to show the transport interchange fmt?
# codec: str = self.chan.transport.codec_key # codec: str = self.chan.transport.codec_key
outcome_str: str = self.repr_outcome( outcome_str: str = self.repr_outcome(
@ -1009,27 +980,6 @@ class Context:
f'{self._nsf}() -> {outcome_str}:' f'{self._nsf}() -> {outcome_str}:'
) )
@property
def repr_caller(self) -> str:
ci: CallerInfo|None = self._caller_info
if ci:
return (
f'{ci.caller_nsp}()'
# f'|_api: {ci.api_nsp}'
)
return '<UNKNOWN caller-frame>'
@property
def repr_api(self) -> str:
# ci: CallerInfo|None = self._caller_info
# if ci:
# return (
# f'{ci.api_nsp}()\n'
# )
return 'Portal.open_context()'
async def cancel( async def cancel(
self, self,
timeout: float = 0.616, timeout: float = 0.616,
@ -1234,9 +1184,8 @@ class Context:
) )
# NOTE: in one way streaming this only happens on the # NOTE: in one way streaming this only happens on the
# parent-ctx-task side (on the side that calls # caller side inside `Actor.start_remote_task()` so if you try
# `Actor.start_remote_task()`) so if you try to send # to send a stop from the caller to the callee in the
# a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error # single-direction-stream case you'll get a lookup error
# currently. # currently.
ctx: Context = actor.get_context( ctx: Context = actor.get_context(
@ -1901,19 +1850,6 @@ class Context:
send_chan: trio.MemorySendChannel = self._send_chan send_chan: trio.MemorySendChannel = self._send_chan
nsf: NamespacePath = self._nsf nsf: NamespacePath = self._nsf
side: str = self.side
if side == 'child':
assert not self._portal
peer_side: str = self.peer_side(side)
flow_body: str = (
f'<= peer {peer_side!r}: {from_uid}\n'
f' |_<{nsf}()>\n\n'
f'=> {side!r}: {self._task}\n'
f' |_<{self.repr_api} @ {self.repr_caller}>\n\n'
)
re: Exception|None re: Exception|None
if re := unpack_error( if re := unpack_error(
msg, msg,
@ -1924,10 +1860,18 @@ class Context:
else: else:
log_meth = log.runtime log_meth = log.runtime
side: str = self.side
peer_side: str = self.peer_side(side)
log_meth( log_meth(
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n' f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
f'{flow_body}' f'<= peer {peer_side!r}: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> {side!r} cid: {cid}\n'
f' |_{self._task}\n\n'
f'{pformat(re)}\n' f'{pformat(re)}\n'
) )
@ -1940,27 +1884,30 @@ class Context:
# or `RemoteActorError`). # or `RemoteActorError`).
self._maybe_cancel_and_set_remote_error(re) self._maybe_cancel_and_set_remote_error(re)
# TODO: expose as mod func instead! # XXX only case where returning early is fine!
structfmt = pretty_struct.Struct.pformat structfmt = pretty_struct.Struct.pformat
if self._in_overrun: if self._in_overrun:
log.warning( log.warning(
f'Queueing OVERRUN msg on caller task:\n\n' f'Queueing OVERRUN msg on caller task:\n'
f'<= peer: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'{flow_body}' f'=> cid: {cid}\n'
f' |_{self._task}\n\n'
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
self._overflow_q.append(msg) self._overflow_q.append(msg)
# XXX NOTE XXX
# overrun is the ONLY case where returning early is fine!
return False return False
try: try:
log.runtime( log.runtime(
f'Delivering msg from IPC ctx:\n\n' f'Delivering msg from IPC ctx:\n'
f'<= {from_uid}\n'
f' |_ {nsf}()\n\n'
f'{flow_body}' f'=> {self._task}\n'
f' |_cid={self.cid}\n\n'
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
@ -1992,7 +1939,6 @@ class Context:
f'cid: {self.cid}\n' f'cid: {self.cid}\n'
'Failed to deliver msg:\n' 'Failed to deliver msg:\n'
f'send_chan: {send_chan}\n\n' f'send_chan: {send_chan}\n\n'
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
return False return False
@ -2146,12 +2092,6 @@ async def open_context_from_portal(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack
# introspection where we report the caller code in logging
# and error message content.
# NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa
# conduct target func method structural checks # conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and ( if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False) getattr(func, '_tractor_contex_function', False)
@ -2179,8 +2119,6 @@ async def open_context_from_portal(
nsf=nsf, nsf=nsf,
kwargs=kwargs, kwargs=kwargs,
portal=portal,
# NOTE: it's imporant to expose this since you might # NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does # get the case where the parent who opened the context does
# not open a stream until after some slow startup/init # not open a stream until after some slow startup/init
@ -2191,17 +2129,13 @@ async def open_context_from_portal(
# place.. # place..
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
assert ctx._remote_func_type == 'context' # ASAP, so that `Context.side: str` can be determined for
assert ctx._caller_info # logging / tracing / debug!
ctx._portal: Portal = portal
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the assert ctx._remote_func_type == 'context'
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
# NOT actually cancel the below line!
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
msg: Started = await ctx._recv_chan.receive() msg: Started = await ctx._recv_chan.receive()
try: try:
# the "first" value here is delivered by the callee's # the "first" value here is delivered by the callee's
# ``Context.started()`` call. # ``Context.started()`` call.
@ -2211,7 +2145,6 @@ async def open_context_from_portal(
# except KeyError as src_error: # except KeyError as src_error:
except AttributeError as src_error: except AttributeError as src_error:
log.exception('Raising from unexpected msg!\n')
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=ctx, ctx=ctx,
msg=msg, msg=msg,
@ -2637,6 +2570,7 @@ async def open_context_from_portal(
None, None,
) )
def mk_context( def mk_context(
chan: Channel, chan: Channel,
cid: str, cid: str,
@ -2658,10 +2592,6 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high!
from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info()
ctx = Context( ctx = Context(
chan=chan, chan=chan,
cid=cid, cid=cid,
@ -2670,7 +2600,6 @@ def mk_context(
_recv_chan=recv_chan, _recv_chan=recv_chan,
_nsf=nsf, _nsf=nsf,
_task=trio.lowlevel.current_task(), _task=trio.lowlevel.current_task(),
_caller_info=caller_info,
**kwargs, **kwargs,
) )
# TODO: we can drop the old placeholder yah? # TODO: we can drop the old placeholder yah?
@ -2681,11 +2610,7 @@ def mk_context(
def context(func: Callable) -> Callable: def context(func: Callable) -> Callable:
''' '''
Mark an (async) function as an SC-supervised, inter-`Actor`, Mark an async function as a streaming routine with ``@context``.
child-`trio.Task`, IPC endpoint otherwise known more
colloquially as a (RPC) "context".
Functions annotated the fundamental IPC endpoint type offered by `tractor`.
''' '''
# TODO: apply whatever solution ``mypy`` ends up picking for this: # TODO: apply whatever solution ``mypy`` ends up picking for this:

View File

@ -935,7 +935,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
def _raise_from_no_key_in_msg( def _raise_from_no_key_in_msg(
ctx: Context, ctx: Context,
msg: MsgType, msg: MsgType,
src_err: AttributeError, src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj log: StackLevelAdapter, # caller specific `log` obj
expect_msg: str = Yield, expect_msg: str = Yield,
@ -994,7 +994,7 @@ def _raise_from_no_key_in_msg(
ctx.chan, ctx.chan,
hide_tb=hide_tb, hide_tb=hide_tb,
) from src_err ) from None
# `MsgStream` termination msg. # `MsgStream` termination msg.
# TODO: does it make more sense to pack # TODO: does it make more sense to pack

View File

@ -314,7 +314,8 @@ class MsgpackTCPStream(MsgTransport):
while True: while True:
try: try:
header: bytes = await self.recv_stream.receive_exactly(4) header = await self.recv_stream.receive_exactly(4)
except ( except (
ValueError, ValueError,
ConnectionResetError, ConnectionResetError,
@ -336,7 +337,8 @@ class MsgpackTCPStream(MsgTransport):
size, = struct.unpack("<I", header) size, = struct.unpack("<I", header)
log.transport(f'received header {size}') # type: ignore log.transport(f'received header {size}') # type: ignore
msg_bytes: bytes = await self.recv_stream.receive_exactly(size)
msg_bytes = await self.recv_stream.receive_exactly(size)
log.transport(f"received {msg_bytes}") # type: ignore log.transport(f"received {msg_bytes}") # type: ignore
try: try:

View File

@ -161,18 +161,17 @@ class Portal:
self._expect_result = await self.actor.start_remote_task( self._expect_result = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=NamespacePath(f'{ns}:{func}'), nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs, kwargs=kwargs
portal=self,
) )
async def _return_once( async def _return_once(
self, self,
ctx: Context, ctx: Context,
) -> Return: ) -> dict[str, Any]:
assert ctx._remote_func_type == 'asyncfunc' # single response assert ctx._remote_func_type == 'asyncfunc' # single response
msg: Return = await ctx._recv_chan.receive() msg: dict = await ctx._recv_chan.receive()
return msg return msg
async def result(self) -> Any: async def result(self) -> Any:
@ -248,8 +247,6 @@ class Portal:
purpose. purpose.
''' '''
__runtimeframe__: int = 1 # noqa
chan: Channel = self.channel chan: Channel = self.channel
if not chan.connected(): if not chan.connected():
log.runtime( log.runtime(
@ -327,18 +324,16 @@ class Portal:
internals! internals!
''' '''
__runtimeframe__: int = 1 # noqa
nsf = NamespacePath( nsf = NamespacePath(
f'{namespace_path}:{function_name}' f'{namespace_path}:{function_name}'
) )
ctx: Context = await self.actor.start_remote_task( ctx = await self.actor.start_remote_task(
chan=self.channel, chan=self.channel,
nsf=nsf, nsf=nsf,
kwargs=kwargs, kwargs=kwargs,
portal=self,
) )
ctx._portal: Portal = self ctx._portal = self
msg: Return = await self._return_once(ctx) msg = await self._return_once(ctx)
return _unwrap_msg( return _unwrap_msg(
msg, msg,
self.channel, self.channel,
@ -389,7 +384,6 @@ class Portal:
self.channel, self.channel,
nsf=nsf, nsf=nsf,
kwargs=kwargs, kwargs=kwargs,
portal=self,
) )
ctx._portal = self ctx._portal = self
return _unwrap_msg( return _unwrap_msg(
@ -404,14 +398,6 @@ class Portal:
**kwargs, **kwargs,
) -> AsyncGenerator[MsgStream, None]: ) -> AsyncGenerator[MsgStream, None]:
'''
Legacy one-way streaming API.
TODO: re-impl on top `Portal.open_context()` + an async gen
around `Context.open_stream()`.
'''
__runtimeframe__: int = 1 # noqa
if not inspect.isasyncgenfunction(async_gen_func): if not inspect.isasyncgenfunction(async_gen_func):
if not ( if not (
@ -425,7 +411,6 @@ class Portal:
self.channel, self.channel,
nsf=NamespacePath.from_ref(async_gen_func), nsf=NamespacePath.from_ref(async_gen_func),
kwargs=kwargs, kwargs=kwargs,
portal=self,
) )
ctx._portal = self ctx._portal = self

View File

@ -135,7 +135,7 @@ async def open_root_actor(
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger lock state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor # mark top most level process as root actor
_state._runtime_vars['_is_root'] = True _state._runtime_vars['_is_root'] = True

View File

@ -814,7 +814,7 @@ async def process_messages(
# should use it? # should use it?
# https://github.com/python-trio/trio/issues/467 # https://github.com/python-trio/trio/issues/467
log.runtime( log.runtime(
'Entering RPC msg loop:\n' 'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'|_{chan}\n' f'|_{chan}\n'
) )
@ -876,7 +876,7 @@ async def process_messages(
# XXX NOTE XXX don't start entire actor # XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is # runtime cancellation if this actor is
# currently in debug mode! # currently in debug mode!
pdb_complete: trio.Event|None = _debug.DebugStatus.repl_release pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete: if pdb_complete:
await pdb_complete.wait() await pdb_complete.wait()
@ -1073,7 +1073,7 @@ async def process_messages(
log.exception(message) log.exception(message)
raise RuntimeError(message) raise RuntimeError(message)
log.transport( log.runtime(
'Waiting on next IPC msg from\n' 'Waiting on next IPC msg from\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'|_{chan}\n' f'|_{chan}\n'

View File

@ -267,13 +267,10 @@ class Actor:
self._listeners: list[trio.abc.Listener] = [] self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Channel|None = None self._parent_chan: Channel|None = None
self._forkserver_info: tuple|None = None self._forkserver_info: tuple|None = None
# track each child/sub-actor in it's locally
# supervising nursery
self._actoruid2nursery: dict[ self._actoruid2nursery: dict[
tuple[str, str], # sub-`Actor.uid` tuple[str, str],
ActorNursery|None, ActorNursery|None,
] = {} ] = {} # type: ignore # noqa
# when provided, init the registry addresses property from # when provided, init the registry addresses property from
# input via the validator. # input via the validator.
@ -662,18 +659,12 @@ class Actor:
# TODO: NEEEDS TO BE TESTED! # TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD # actually, no idea if this ever even enters.. XD
#
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if ( if (
pdb_user_uid pdb_user_uid
and local_nursery and local_nursery
): ):
entry: tuple|None = local_nursery._children.get( entry: tuple|None = local_nursery._children.get(pdb_user_uid)
tuple(pdb_user_uid)
)
if entry: if entry:
proc: trio.Process proc: trio.Process
_, proc, _ = entry _, proc, _ = entry
@ -683,10 +674,10 @@ class Actor:
and poll() is None and poll() is None
): ):
log.cancel( log.cancel(
'Root actor reports no-more-peers, BUT\n' 'Root actor reports no-more-peers, BUT '
'a DISCONNECTED child still has the debug ' 'a DISCONNECTED child still has the debug '
'lock!\n\n' 'lock!\n'
# f'root uid: {self.uid}\n' f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n' f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n' f'locking child uid: {pdb_user_uid}\n'
) )
@ -712,8 +703,9 @@ class Actor:
# if a now stale local task has the TTY lock still # if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for # we cancel it to allow servicing other requests for
# the lock. # the lock.
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if ( if (
(db_cs := pdb_lock.get_locking_task_cs()) db_cs
and not db_cs.cancel_called and not db_cs.cancel_called
and uid == pdb_user_uid and uid == pdb_user_uid
): ):
@ -750,7 +742,7 @@ class Actor:
except KeyError: except KeyError:
log.warning( log.warning(
'Ignoring invalid IPC ctx msg!\n\n' 'Ignoring invalid IPC ctx msg!\n\n'
f'<= sender: {uid}\n\n' f'<= sender: {uid}\n'
# XXX don't need right since it's always in msg? # XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n' # f'=> cid: {cid}\n\n'
@ -804,7 +796,7 @@ class Actor:
cid, cid,
# side, # side,
)] )]
log.debug( log.runtime(
f'Retreived cached IPC ctx for\n' f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'cid:{cid}\n' f'cid:{cid}\n'
@ -843,14 +835,10 @@ class Actor:
nsf: NamespacePath, nsf: NamespacePath,
kwargs: dict, kwargs: dict,
# determines `Context.side: str`
portal: Portal|None = None,
# IPC channel config # IPC channel config
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
load_nsf: bool = False, load_nsf: bool = False,
ack_timeout: float = 3,
) -> Context: ) -> Context:
''' '''
@ -875,12 +863,10 @@ class Actor:
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
ctx._portal = portal
if ( if (
'self' in nsf 'self' in nsf
or or not load_nsf
not load_nsf
): ):
ns, _, func = nsf.partition(':') ns, _, func = nsf.partition(':')
else: else:
@ -888,29 +874,42 @@ class Actor:
# -[ ] but, how to do `self:<Actor.meth>`?? # -[ ] but, how to do `self:<Actor.meth>`??
ns, func = nsf.to_tuple() ns, func = nsf.to_tuple()
msg = msgtypes.Start( log.runtime(
'Sending cmd to\n'
f'peer: {chan.uid} => \n'
'\n'
f'=> {ns}.{func}({kwargs})\n'
)
await chan.send(
msgtypes.Start(
ns=ns, ns=ns,
func=func, func=func,
kwargs=kwargs, kwargs=kwargs,
uid=self.uid, uid=self.uid,
cid=cid, cid=cid,
) )
log.runtime(
'Sending RPC start msg\n\n'
f'=> peer: {chan.uid}\n'
f' |_ {ns}.{func}({kwargs})\n'
) )
await chan.send(msg) # {'cmd': (
# ns,
# func,
# kwargs,
# self.uid,
# cid,
# )}
# )
# Wait on first response msg and validate; this should be
# immediate.
# first_msg: dict = await ctx._recv_chan.receive()
# functype: str = first_msg.get('functype')
# NOTE wait on first `StartAck` response msg and validate;
# this should be immediate and does not (yet) wait for the
# remote child task to sync via `Context.started()`.
with trio.fail_after(ack_timeout):
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive() first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
try: try:
functype: str = first_msg.functype functype: str = first_msg.functype
except AttributeError: except AttributeError:
raise unpack_error(first_msg, chan) raise unpack_error(first_msg, chan)
# if 'error' in first_msg:
# raise unpack_error(first_msg, chan)
if functype not in ( if functype not in (
'asyncfunc', 'asyncfunc',
@ -918,7 +917,7 @@ class Actor:
'context', 'context',
): ):
raise ValueError( raise ValueError(
f'Invalid `StartAck.functype: str = {first_msg!r}` ??' f'{first_msg} is an invalid response packet?'
) )
ctx._remote_func_type = functype ctx._remote_func_type = functype
@ -1163,7 +1162,7 @@ class Actor:
# kill any debugger request task to avoid deadlock # kill any debugger request task to avoid deadlock
# with the root actor in this tree # with the root actor in this tree
dbcs = _debug.DebugStatus.req_cs dbcs = _debug.Lock._debugger_request_cs
if dbcs is not None: if dbcs is not None:
msg += ( msg += (
'>> Cancelling active debugger request..\n' '>> Cancelling active debugger request..\n'
@ -1238,9 +1237,9 @@ class Actor:
except KeyError: except KeyError:
# NOTE: during msging race conditions this will often # NOTE: during msging race conditions this will often
# emit, some examples: # emit, some examples:
# - child returns a result before cancel-msg/ctxc-raised # - callee returns a result before cancel-msg/ctxc-raised
# - child self raises ctxc before parent send request, # - callee self raises ctxc before caller send request,
# - child errors prior to cancel req. # - callee errors prior to cancel req.
log.cancel( log.cancel(
'Cancel request invalid, RPC task already completed?\n\n' 'Cancel request invalid, RPC task already completed?\n\n'
f'<= canceller: {requesting_uid}\n\n' f'<= canceller: {requesting_uid}\n\n'
@ -1303,15 +1302,15 @@ class Actor:
flow_info: str = ( flow_info: str = (
f'<= canceller: {requesting_uid}\n' f'<= canceller: {requesting_uid}\n'
f'=> ipc-parent: {parent_chan}\n' f'=> ipc-parent: {parent_chan}\n'
f'|_{ctx}\n' f' |_{ctx}\n'
) )
log.runtime( log.runtime(
'Waiting on RPC task to cancel\n\n' 'Waiting on RPC task to cancel\n'
f'{flow_info}' f'{flow_info}'
) )
await is_complete.wait() await is_complete.wait()
log.runtime( log.runtime(
f'Sucessfully cancelled RPC task\n\n' f'Sucessfully cancelled RPC task\n'
f'{flow_info}' f'{flow_info}'
) )
return True return True
@ -1537,8 +1536,8 @@ async def async_main(
''' '''
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger state. # on our debugger lock state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
is_registered: bool = False is_registered: bool = False
try: try:

View File

@ -30,16 +30,11 @@ if TYPE_CHECKING:
_current_actor: Actor|None = None # type: ignore # noqa _current_actor: Actor|None = None # type: ignore # noqa
_last_actor_terminated: Actor|None = None _last_actor_terminated: Actor|None = None
# TODO: mk this a `msgspec.Struct`!
_runtime_vars: dict[str, Any] = { _runtime_vars: dict[str, Any] = {
'_debug_mode': False, '_debug_mode': False,
'_is_root': False, '_is_root': False,
'_root_mailbox': (None, None), '_root_mailbox': (None, None),
'_registry_addrs': [], '_registry_addrs': [],
# for `breakpoint()` support
'use_greenback': False,
} }
@ -66,7 +61,7 @@ def current_actor(
err_on_no_runtime err_on_no_runtime
and _current_actor is None and _current_actor is None
): ):
msg: str = 'No local actor has been initialized yet?\n' msg: str = 'No local actor has been initialized yet'
from ._exceptions import NoRuntime from ._exceptions import NoRuntime
if last := last_actor(): if last := last_actor():
@ -79,8 +74,8 @@ def current_actor(
# this process. # this process.
else: else:
msg += ( msg += (
# 'No last actor found?\n' 'No last actor found?\n'
'\nDid you forget to call one of,\n' 'Did you forget to open one of:\n\n'
'- `tractor.open_root_actor()`\n' '- `tractor.open_root_actor()`\n'
'- `tractor.open_nursery()`\n' '- `tractor.open_nursery()`\n'
) )

View File

@ -377,17 +377,14 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose() # await rx_chan.aclose()
if not self._eoc: if not self._eoc:
message: str = ( log.cancel(
f'Context stream closed by {self._ctx.side!r}\n' 'Stream closed by self before it received an EoC?\n'
'Setting eoc manually..\n..'
)
self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by self({self._ctx.side})\n'
f'|_{self}\n' f'|_{self}\n'
) )
log.cancel(
'Stream self-closed before receiving EoC\n\n'
+
message
)
self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <= # => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same # if we're a bi-dir ``MsgStream`` BECAUSE this same

View File

@ -131,12 +131,7 @@ class ActorNursery:
"main task" besides the runtime. "main task" besides the runtime.
''' '''
__runtimeframe__: int = 1 # noqa loglevel = loglevel or self._actor.loglevel or get_loglevel()
loglevel: str = (
loglevel
or self._actor.loglevel
or get_loglevel()
)
# configure and pass runtime state # configure and pass runtime state
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
@ -214,7 +209,6 @@ class ActorNursery:
the actor is terminated. the actor is terminated.
''' '''
__runtimeframe__: int = 1 # noqa
mod_path: str = fn.__module__ mod_path: str = fn.__module__
if name is None: if name is None:
@ -263,7 +257,6 @@ class ActorNursery:
directly without any far end graceful ``trio`` cancellation. directly without any far end graceful ``trio`` cancellation.
''' '''
__runtimeframe__: int = 1 # noqa
self.cancelled = True self.cancelled = True
# TODO: impl a repr for spawn more compact # TODO: impl a repr for spawn more compact

View File

@ -27,6 +27,7 @@ from ._debug import (
pause as pause, pause as pause,
pause_from_sync as pause_from_sync, pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler, shield_sigint_handler as shield_sigint_handler,
MultiActorPdb as MultiActorPdb,
open_crash_handler as open_crash_handler, open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler,
post_mortem as post_mortem, post_mortem as post_mortem,

View File

@ -1,177 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Tools for code-object annotation, introspection and mutation
as it pertains to improving the grok-ability of our runtime!
'''
from __future__ import annotations
import inspect
# import msgspec
# from pprint import pformat
from types import (
FrameType,
FunctionType,
MethodType,
# CodeType,
)
from typing import (
# Any,
Callable,
# TYPE_CHECKING,
Type,
)
from tractor.msg import (
pretty_struct,
NamespacePath,
)
# TODO: yeah, i don't love this and we should prolly just
# write a decorator that actually keeps a stupid ref to the func
# obj..
def get_class_from_frame(fr: FrameType) -> (
FunctionType
|MethodType
):
'''
Attempt to get the function (or method) reference
from a given `FrameType`.
Verbatim from an SO:
https://stackoverflow.com/a/2220759
'''
args, _, _, value_dict = inspect.getargvalues(fr)
# we check the first parameter for the frame function is
# named 'self'
if (
len(args)
and
# TODO: other cases for `@classmethod` etc..?)
args[0] == 'self'
):
# in that case, 'self' will be referenced in value_dict
instance: object = value_dict.get('self')
if instance:
# return its class
return getattr(
instance,
'__class__',
None,
)
# return None otherwise
return None
def func_ref_from_frame(
frame: FrameType,
) -> Callable:
func_name: str = frame.f_code.co_name
try:
return frame.f_globals[func_name]
except KeyError:
cls: Type|None = get_class_from_frame(frame)
if cls:
return getattr(
cls,
func_name,
)
# TODO: move all this into new `.devx._code`!
# -[ ] prolly create a `@runtime_api` dec?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
class CallerInfo(pretty_struct.Struct):
rt_fi: inspect.FrameInfo
call_frame: FrameType
@property
def api_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.rt_fi.frame)
@property
def api_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
@property
def caller_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.call_frame)
@property
def caller_nsp(self) -> NamespacePath|None:
func: FunctionType = self.caller_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
def find_caller_info(
dunder_var: str = '__runtimeframe__',
iframes:int = 1,
check_frame_depth: bool = True,
) -> CallerInfo|None:
'''
Scan up the callstack for a frame with a `dunder_var: str` variable
and return the `iframes` frames above it.
By default we scan for a `__runtimeframe__` scope var which
denotes a `tractor` API above which (one frame up) is "user
app code" which "called into" the `tractor` method or func.
TODO: ex with `Portal.open_context()`
'''
# TODO: use this instead?
# https://docs.python.org/3/library/inspect.html#inspect.getouterframes
frames: list[inspect.FrameInfo] = inspect.stack()
for fi in frames:
assert (
fi.function
==
fi.frame.f_code.co_name
)
this_frame: FrameType = fi.frame
dunder_val: int|None = this_frame.f_locals.get(dunder_var)
if dunder_val:
go_up_iframes: int = (
dunder_val # could be 0 or `True` i guess?
or
iframes
)
rt_frame: FrameType = fi.frame
call_frame = rt_frame
for i in range(go_up_iframes):
call_frame = call_frame.f_back
return CallerInfo(
rt_fi=fi,
call_frame=call_frame,
)
return None

File diff suppressed because it is too large Load Diff

View File

@ -23,31 +23,12 @@ into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors. disjoint, parallel executing tasks in separate actors.
''' '''
from __future__ import annotations
import multiprocessing as mp
from signal import ( from signal import (
signal, signal,
SIGUSR1, SIGUSR1,
) )
import traceback
from typing import TYPE_CHECKING
import trio import trio
from tractor import (
_state,
log as logmod,
)
log = logmod.get_logger(__name__)
if TYPE_CHECKING:
from tractor._spawn import ProcessType
from tractor import (
Actor,
ActorNursery,
)
@trio.lowlevel.disable_ki_protection @trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None: def dump_task_tree() -> None:
@ -60,15 +41,9 @@ def dump_task_tree() -> None:
recurse_child_tasks=True recurse_child_tasks=True
) )
) )
log = get_console_log( log = get_console_log('cancel')
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor()
log.pdb( log.pdb(
f'Dumping `stackscope` tree for actor\n' f'Dumping `stackscope` tree:\n\n'
f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n'
f'{tree_str}\n' f'{tree_str}\n'
) )
# import logging # import logging
@ -81,13 +56,8 @@ def dump_task_tree() -> None:
# ).exception("Error printing task tree") # ).exception("Error printing task tree")
def signal_handler( def signal_handler(sig: int, frame: object) -> None:
sig: int, import traceback
frame: object,
relay_to_subs: bool = True,
) -> None:
try: try:
trio.lowlevel.current_trio_token( trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree) ).run_sync_soon(dump_task_tree)
@ -95,26 +65,6 @@ def signal_handler(
# not in async context -- print a normal traceback # not in async context -- print a normal traceback
traceback.print_stack() traceback.print_stack()
if not relay_to_subs:
return
an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType
subactor: Actor
for subactor, subproc, _ in an._children.values():
log.pdb(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n'
f' |_{subproc}\n'
)
if isinstance(subproc, trio.Process):
subproc.send_signal(sig)
elif isinstance(subproc, mp.Process):
subproc._send_signal(sig)
def enable_stack_on_sig( def enable_stack_on_sig(
@ -132,6 +82,3 @@ def enable_stack_on_sig(
# NOTE: not the above can be triggered from # NOTE: not the above can be triggered from
# a (xonsh) shell using: # a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>') # kill -SIGUSR1 @$(pgrep -f '<cmd>')
#
# for example if you were looking to trace a `pytest` run
# kill -SIGUSR1 @$(pgrep -f 'pytest')

View File

@ -33,29 +33,25 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
contextmanager as cm, contextmanager as cm,
) )
from contextvars import ( # from contextvars import (
ContextVar, # ContextVar,
Token, # Token,
) # )
import textwrap import textwrap
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Type, Type,
TYPE_CHECKING,
Union, Union,
) )
from types import ModuleType from types import ModuleType
import msgspec import msgspec
from msgspec import ( from msgspec import msgpack
msgpack, from trio.lowlevel import (
Raw, RunVar,
RunVarToken,
) )
# from trio.lowlevel import (
# RunVar,
# RunVarToken,
# )
# TODO: see notes below from @mikenerone.. # TODO: see notes below from @mikenerone..
# from tricycle import TreeVar # from tricycle import TreeVar
@ -66,9 +62,6 @@ from tractor.msg.types import (
) )
from tractor.log import get_logger from tractor.log import get_logger
if TYPE_CHECKING:
from tractor._context import Context
log = get_logger(__name__) log = get_logger(__name__)
# TODO: overall IPC msg-spec features (i.e. in this mod)! # TODO: overall IPC msg-spec features (i.e. in this mod)!
@ -164,6 +157,24 @@ class MsgCodec(Struct):
lib: ModuleType = msgspec lib: ModuleType = msgspec
# TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# TODO: use `functools.cached_property` for these ? # TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property # https://docs.python.org/3/library/functools.html#functools.cached_property
@property @property
@ -199,25 +210,7 @@ class MsgCodec(Struct):
# https://jcristharif.com/msgspec/usage.html#typed-decoding # https://jcristharif.com/msgspec/usage.html#typed-decoding
return self._dec.decode(msg) return self._dec.decode(msg)
# TODO: a sub-decoder system as well? # TODO: do we still want to try and support the sub-decoder with
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
#
# -[ ] do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives # `.Raw` technique in the case that the `Generic` approach gives
# future grief? # future grief?
# #
@ -436,9 +429,6 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# #
_def_tractor_codec: MsgCodec = mk_codec( _def_tractor_codec: MsgCodec = mk_codec(
ipc_pld_spec=Any, ipc_pld_spec=Any,
# TODO: use this for debug mode locking prot?
# ipc_pld_spec=Raw,
) )
# TODO: IDEALLY provides for per-`trio.Task` specificity of the # TODO: IDEALLY provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing # IPC msging codec used by the transport layer when doing
@ -472,9 +462,11 @@ _def_tractor_codec: MsgCodec = mk_codec(
# TODO: STOP USING THIS, since it's basically a global and won't # TODO: STOP USING THIS, since it's basically a global and won't
# allow sub-IPC-ctxs to limit the msg-spec however desired.. # allow sub-IPC-ctxs to limit the msg-spec however desired..
# _ctxvar_MsgCodec: MsgCodec = RunVar( _ctxvar_MsgCodec: MsgCodec = RunVar(
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
'msgspec_codec', 'msgspec_codec',
# TODO: move this to our new `Msg`-spec!
# default=_def_msgspec_codec,
default=_def_tractor_codec, default=_def_tractor_codec,
) )
@ -483,36 +475,23 @@ _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
def apply_codec( def apply_codec(
codec: MsgCodec, codec: MsgCodec,
ctx: Context|None = None,
) -> MsgCodec: ) -> MsgCodec:
''' '''
Dynamically apply a `MsgCodec` to the current task's runtime Dynamically apply a `MsgCodec` to the current task's
context such that all (of a certain class of payload runtime context such that all IPC msgs are processed
containing i.e. `MsgType.pld: PayloadT`) IPC msgs are with it for that task.
processed with it for that task.
Uses a `contextvars.ContextVar` to ensure the scope of any
codec setting matches the current `Context` or
`._rpc.process_messages()` feeder task's prior setting without
mutating any surrounding scope.
When a `ctx` is supplied, only mod its `Context.pld_codec`.
Uses a `tricycle.TreeVar` to ensure the scope of the codec
matches the `@cm` block and DOES NOT change to the original matches the `@cm` block and DOES NOT change to the original
(default) value in new tasks (as it does for `ContextVar`). (default) value in new tasks (as it does for `ContextVar`).
See the docs:
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
orig: MsgCodec = _ctxvar_MsgCodec.get()
if ctx is not None:
var: ContextVar = ctx._var_pld_codec
else:
# use IPC channel-connection "global" codec
var: ContextVar = _ctxvar_MsgCodec
orig: MsgCodec = var.get()
assert orig is not codec assert orig is not codec
if codec.pld_spec is None: if codec.pld_spec is None:
breakpoint() breakpoint()
@ -521,25 +500,22 @@ def apply_codec(
'Applying new msg-spec codec\n\n' 'Applying new msg-spec codec\n\n'
f'{codec}\n' f'{codec}\n'
) )
token: Token = var.set(codec) token: RunVarToken = _ctxvar_MsgCodec.set(codec)
# ?TODO? for TreeVar approach which copies from the # TODO: for TreeVar approach, see docs for @cm `.being()` API:
# cancel-scope of the prior value, NOT the prior task # https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# See the docs: # try:
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
# ^- see docs for @cm `.being()` API
# with _ctxvar_MsgCodec.being(codec): # with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get() # new = _ctxvar_MsgCodec.get()
# assert new is codec # assert new is codec
# yield codec # yield codec
try: try:
yield var.get() yield _ctxvar_MsgCodec.get()
finally: finally:
var.reset(token) _ctxvar_MsgCodec.reset(token)
assert var.get() is orig assert _ctxvar_MsgCodec.get() is orig
log.info( log.info(
'Reverted to last msg-spec codec\n\n' 'Reverted to last msg-spec codec\n\n'
f'{orig}\n' f'{orig}\n'

View File

@ -76,11 +76,9 @@ class NamespacePath(str):
return self._ref return self._ref
@staticmethod @staticmethod
def _mk_fqnp( def _mk_fqnp(ref: type | object) -> tuple[str, str]:
ref: type|object,
) -> tuple[str, str]:
''' '''
Generate a minial `str` pair which describes a python Generate a minial ``str`` pair which describes a python
object's namespace path and object/type name. object's namespace path and object/type name.
In more precise terms something like: In more precise terms something like:
@ -89,9 +87,10 @@ class NamespacePath(str):
of THIS type XD of THIS type XD
''' '''
if isfunction(ref): if (
isfunction(ref)
):
name: str = getattr(ref, '__name__') name: str = getattr(ref, '__name__')
mod_name: str = ref.__module__
elif ismethod(ref): elif ismethod(ref):
# build out the path manually i guess..? # build out the path manually i guess..?
@ -100,19 +99,15 @@ class NamespacePath(str):
type(ref.__self__).__name__, type(ref.__self__).__name__,
ref.__func__.__name__, ref.__func__.__name__,
]) ])
mod_name: str = ref.__self__.__module__
else: # object or other? else: # object or other?
# isinstance(ref, object) # isinstance(ref, object)
# and not isfunction(ref) # and not isfunction(ref)
name: str = type(ref).__name__ name: str = type(ref).__name__
mod_name: str = ref.__module__
# TODO: return static value direactly?
#
# fully qualified namespace path, tuple. # fully qualified namespace path, tuple.
fqnp: tuple[str, str] = ( fqnp: tuple[str, str] = (
mod_name, ref.__module__,
name, name,
) )
return fqnp return fqnp
@ -120,7 +115,7 @@ class NamespacePath(str):
@classmethod @classmethod
def from_ref( def from_ref(
cls, cls,
ref: type|object, ref: type | object,
) -> NamespacePath: ) -> NamespacePath: