Compare commits

..

No commits in common. "9b4f5804706b2c7effcf9c39d579c3c2bdb41e25" and "b209990d045c9069b11279916dc529bbe3152b0b" have entirely different histories.

17 changed files with 560 additions and 1445 deletions

View File

@ -14,20 +14,19 @@ from typing import (
from contextvars import (
Context,
)
# from inspect import Parameter
from msgspec import (
structs,
msgpack,
# defstruct,
Struct,
ValidationError,
)
import pytest
import tractor
from tractor import (
_state,
MsgTypeError,
)
from tractor import _state
from tractor.msg import (
_codec,
_ctxvar_MsgCodec,
@ -48,6 +47,21 @@ from tractor.msg.types import (
import trio
def test_msg_spec_xor_pld_spec():
'''
If the `.msg.types.Msg`-set is overridden, we
can't also support a `Msg.pld` spec.
'''
# apply custom hooks and set a `Decoder` which only
# loads `NamespacePath` types.
with pytest.raises(RuntimeError):
mk_codec(
ipc_msg_spec=Any,
ipc_pld_spec=NamespacePath,
)
def mk_custom_codec(
pld_spec: Union[Type]|Any,
add_hooks: bool,
@ -120,9 +134,7 @@ def mk_custom_codec(
f'{uid}\n'
'FAILED DECODE\n'
f'type-> {obj_type}\n'
f'obj-arg-> `{obj}`: {type(obj)}\n\n'
f'current codec:\n'
f'{current_codec()}\n'
f'obj-arg-> `{obj}`: {type(obj)}\n'
)
# TODO: figure out the ignore subsys for this!
# -[ ] option whether to defense-relay backc the msg
@ -397,9 +409,7 @@ async def send_back_values(
pld_spec=ipc_pld_spec,
add_hooks=add_hooks,
)
with (
apply_codec(nsp_codec) as codec,
):
with apply_codec(nsp_codec) as codec:
chk_codec_applied(
expect_codec=nsp_codec,
enter_value=codec,
@ -449,7 +459,7 @@ async def send_back_values(
# XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL
# `str` handling! or special debug mode IPC
# msgs!
await tractor.pause()
# await tractor.pause()
raise RuntimeError(
f'NOT-EXPECTED able to roundtrip value given spec:\n'
@ -460,8 +470,7 @@ async def send_back_values(
break # move on to streaming block..
except tractor.MsgTypeError:
await tractor.pause()
# await tractor.pause()
if expect_send:
raise RuntimeError(
f'EXPECTED to `.started()` value given spec:\n'
@ -643,42 +652,12 @@ def test_codec_hooks_mod(
pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
# XXX should raise an mte (`MsgTypeError`)
# when `add_codec_hooks == False` bc the input
# `expect_ipc_send` kwarg has a nsp which can't be
# serialized!
#
# TODO:can we ensure this happens from the
# `Return`-side (aka the sub) as well?
if not add_codec_hooks:
try:
async with p.open_context(
send_back_values,
expect_debug=debug_mode,
pld_spec_type_strs=pld_spec_type_strs,
add_hooks=add_codec_hooks,
started_msg_bytes=nsp_codec.encode(expected_started),
# XXX NOTE bc we send a `NamespacePath` in this kwarg
expect_ipc_send=expect_ipc_send,
) as (ctx, first):
pytest.fail('ctx should fail to open without custom enc_hook!?')
# this test passes bc we can go no further!
except MsgTypeError:
# teardown nursery
await p.cancel_actor()
return
# TODO: send the original nsp here and
# test with `limit_msg_spec()` above?
# await tractor.pause()
print('PARENT opening IPC ctx!\n')
async with (
# XXX should raise an mte (`MsgTypeError`)
# when `add_codec_hooks == False`..
p.open_context(
send_back_values,
expect_debug=debug_mode,

View File

@ -26,7 +26,6 @@ disjoint, parallel executing tasks in separate actors.
from __future__ import annotations
from collections import deque
from contextlib import asynccontextmanager as acm
from contextvars import ContextVar
from dataclasses import (
dataclass,
field,
@ -57,7 +56,6 @@ from ._exceptions import (
)
from .log import get_logger
from .msg import (
_codec,
Error,
MsgType,
MsgCodec,
@ -82,9 +80,6 @@ if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
from ._ipc import MsgTransport
from .devx._code import (
CallerInfo,
)
log = get_logger(__name__)
@ -504,18 +499,6 @@ class Context:
_started_called: bool = False
_stream_opened: bool = False
_stream: MsgStream|None = None
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
'pld_codec',
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
)
@property
def pld_codec(self) -> MsgCodec|None:
return self._pld_codec_var.get()
# caller of `Portal.open_context()` for
# logging purposes mostly
_caller_info: CallerInfo|None = None
# overrun handling machinery
# NOTE: none of this provides "backpressure" to the remote
@ -542,7 +525,6 @@ class Context:
# TODO: figure out how we can enforce this without losing our minds..
_strict_started: bool = False
_cancel_on_msgerr: bool = True
def __str__(self) -> str:
ds: str = '='
@ -875,7 +857,6 @@ class Context:
# TODO: never do this right?
# if self._remote_error:
# return
peer_side: str = self.peer_side(self.side)
# XXX: denote and set the remote side's error so that
# after we cancel whatever task is the opener of this
@ -883,15 +864,14 @@ class Context:
# appropriately.
log.runtime(
'Setting remote error for ctx\n\n'
f'<= {peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}\n\n'
f'{error}'
f'<= remote ctx uid: {self.chan.uid}\n'
f'=>{error}'
)
self._remote_error: BaseException = error
# self-cancel (ack) or,
# peer propagated remote cancellation.
msgerr: bool = False
msgtyperr: bool = False
if isinstance(error, ContextCancelled):
whom: str = (
@ -904,7 +884,7 @@ class Context:
)
elif isinstance(error, MsgTypeError):
msgerr = True
msgtyperr = True
peer_side: str = self.peer_side(self.side)
log.error(
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
@ -955,24 +935,13 @@ class Context:
and not self._is_self_cancelled()
and not cs.cancel_called
and not cs.cancelled_caught
and (
msgerr
and
# NOTE: allow user to config not cancelling the
# local scope on `MsgTypeError`s
self._cancel_on_msgerr
)
and not msgtyperr
):
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
log.cancel('Cancelling local `.open_context()` scope!')
self._scope.cancel()
else:
log.cancel('NOT cancelling local `.open_context()` scope!')
# TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs?
@ -997,7 +966,9 @@ class Context:
dmaddr = dst_maddr
@property
def repr_rpc(self) -> str:
def repr_rpc(
self,
) -> str:
# TODO: how to show the transport interchange fmt?
# codec: str = self.chan.transport.codec_key
outcome_str: str = self.repr_outcome(
@ -1009,27 +980,6 @@ class Context:
f'{self._nsf}() -> {outcome_str}:'
)
@property
def repr_caller(self) -> str:
ci: CallerInfo|None = self._caller_info
if ci:
return (
f'{ci.caller_nsp}()'
# f'|_api: {ci.api_nsp}'
)
return '<UNKNOWN caller-frame>'
@property
def repr_api(self) -> str:
# ci: CallerInfo|None = self._caller_info
# if ci:
# return (
# f'{ci.api_nsp}()\n'
# )
return 'Portal.open_context()'
async def cancel(
self,
timeout: float = 0.616,
@ -1234,9 +1184,8 @@ class Context:
)
# NOTE: in one way streaming this only happens on the
# parent-ctx-task side (on the side that calls
# `Actor.start_remote_task()`) so if you try to send
# a stop from the caller to the callee in the
# caller side inside `Actor.start_remote_task()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx: Context = actor.get_context(
@ -1901,19 +1850,6 @@ class Context:
send_chan: trio.MemorySendChannel = self._send_chan
nsf: NamespacePath = self._nsf
side: str = self.side
if side == 'child':
assert not self._portal
peer_side: str = self.peer_side(side)
flow_body: str = (
f'<= peer {peer_side!r}: {from_uid}\n'
f' |_<{nsf}()>\n\n'
f'=> {side!r}: {self._task}\n'
f' |_<{self.repr_api} @ {self.repr_caller}>\n\n'
)
re: Exception|None
if re := unpack_error(
msg,
@ -1924,10 +1860,18 @@ class Context:
else:
log_meth = log.runtime
side: str = self.side
peer_side: str = self.peer_side(side)
log_meth(
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
f'{flow_body}'
f'<= peer {peer_side!r}: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> {side!r} cid: {cid}\n'
f' |_{self._task}\n\n'
f'{pformat(re)}\n'
)
@ -1940,27 +1884,30 @@ class Context:
# or `RemoteActorError`).
self._maybe_cancel_and_set_remote_error(re)
# TODO: expose as mod func instead!
# XXX only case where returning early is fine!
structfmt = pretty_struct.Struct.pformat
if self._in_overrun:
log.warning(
f'Queueing OVERRUN msg on caller task:\n\n'
f'Queueing OVERRUN msg on caller task:\n'
f'<= peer: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'{flow_body}'
f'=> cid: {cid}\n'
f' |_{self._task}\n\n'
f'{structfmt(msg)}\n'
)
self._overflow_q.append(msg)
# XXX NOTE XXX
# overrun is the ONLY case where returning early is fine!
return False
try:
log.runtime(
f'Delivering msg from IPC ctx:\n\n'
f'Delivering msg from IPC ctx:\n'
f'<= {from_uid}\n'
f' |_ {nsf}()\n\n'
f'{flow_body}'
f'=> {self._task}\n'
f' |_cid={self.cid}\n\n'
f'{structfmt(msg)}\n'
)
@ -1992,7 +1939,6 @@ class Context:
f'cid: {self.cid}\n'
'Failed to deliver msg:\n'
f'send_chan: {send_chan}\n\n'
f'{pformat(msg)}\n'
)
return False
@ -2146,12 +2092,6 @@ async def open_context_from_portal(
'''
__tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack
# introspection where we report the caller code in logging
# and error message content.
# NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa
# conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False)
@ -2179,8 +2119,6 @@ async def open_context_from_portal(
nsf=nsf,
kwargs=kwargs,
portal=portal,
# NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does
# not open a stream until after some slow startup/init
@ -2191,17 +2129,13 @@ async def open_context_from_portal(
# place..
allow_overruns=allow_overruns,
)
assert ctx._remote_func_type == 'context'
assert ctx._caller_info
# ASAP, so that `Context.side: str` can be determined for
# logging / tracing / debug!
ctx._portal: Portal = portal
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
# NOT actually cancel the below line!
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
assert ctx._remote_func_type == 'context'
msg: Started = await ctx._recv_chan.receive()
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
@ -2211,7 +2145,6 @@ async def open_context_from_portal(
# except KeyError as src_error:
except AttributeError as src_error:
log.exception('Raising from unexpected msg!\n')
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
@ -2637,6 +2570,7 @@ async def open_context_from_portal(
None,
)
def mk_context(
chan: Channel,
cid: str,
@ -2658,10 +2592,6 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high!
from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info()
ctx = Context(
chan=chan,
cid=cid,
@ -2670,7 +2600,6 @@ def mk_context(
_recv_chan=recv_chan,
_nsf=nsf,
_task=trio.lowlevel.current_task(),
_caller_info=caller_info,
**kwargs,
)
# TODO: we can drop the old placeholder yah?
@ -2681,11 +2610,7 @@ def mk_context(
def context(func: Callable) -> Callable:
'''
Mark an (async) function as an SC-supervised, inter-`Actor`,
child-`trio.Task`, IPC endpoint otherwise known more
colloquially as a (RPC) "context".
Functions annotated the fundamental IPC endpoint type offered by `tractor`.
Mark an async function as a streaming routine with ``@context``.
'''
# TODO: apply whatever solution ``mypy`` ends up picking for this:

View File

@ -935,7 +935,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
def _raise_from_no_key_in_msg(
ctx: Context,
msg: MsgType,
src_err: AttributeError,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_msg: str = Yield,
@ -994,7 +994,7 @@ def _raise_from_no_key_in_msg(
ctx.chan,
hide_tb=hide_tb,
) from src_err
) from None
# `MsgStream` termination msg.
# TODO: does it make more sense to pack

View File

@ -314,7 +314,8 @@ class MsgpackTCPStream(MsgTransport):
while True:
try:
header: bytes = await self.recv_stream.receive_exactly(4)
header = await self.recv_stream.receive_exactly(4)
except (
ValueError,
ConnectionResetError,
@ -336,7 +337,8 @@ class MsgpackTCPStream(MsgTransport):
size, = struct.unpack("<I", header)
log.transport(f'received header {size}') # type: ignore
msg_bytes: bytes = await self.recv_stream.receive_exactly(size)
msg_bytes = await self.recv_stream.receive_exactly(size)
log.transport(f"received {msg_bytes}") # type: ignore
try:

View File

@ -161,18 +161,17 @@ class Portal:
self._expect_result = await self.actor.start_remote_task(
self.channel,
nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs,
portal=self,
kwargs=kwargs
)
async def _return_once(
self,
ctx: Context,
) -> Return:
) -> dict[str, Any]:
assert ctx._remote_func_type == 'asyncfunc' # single response
msg: Return = await ctx._recv_chan.receive()
msg: dict = await ctx._recv_chan.receive()
return msg
async def result(self) -> Any:
@ -248,8 +247,6 @@ class Portal:
purpose.
'''
__runtimeframe__: int = 1 # noqa
chan: Channel = self.channel
if not chan.connected():
log.runtime(
@ -327,18 +324,16 @@ class Portal:
internals!
'''
__runtimeframe__: int = 1 # noqa
nsf = NamespacePath(
f'{namespace_path}:{function_name}'
)
ctx: Context = await self.actor.start_remote_task(
ctx = await self.actor.start_remote_task(
chan=self.channel,
nsf=nsf,
kwargs=kwargs,
portal=self,
)
ctx._portal: Portal = self
msg: Return = await self._return_once(ctx)
ctx._portal = self
msg = await self._return_once(ctx)
return _unwrap_msg(
msg,
self.channel,
@ -389,7 +384,6 @@ class Portal:
self.channel,
nsf=nsf,
kwargs=kwargs,
portal=self,
)
ctx._portal = self
return _unwrap_msg(
@ -404,14 +398,6 @@ class Portal:
**kwargs,
) -> AsyncGenerator[MsgStream, None]:
'''
Legacy one-way streaming API.
TODO: re-impl on top `Portal.open_context()` + an async gen
around `Context.open_stream()`.
'''
__runtimeframe__: int = 1 # noqa
if not inspect.isasyncgenfunction(async_gen_func):
if not (
@ -425,7 +411,6 @@ class Portal:
self.channel,
nsf=NamespacePath.from_ref(async_gen_func),
kwargs=kwargs,
portal=self,
)
ctx._portal = self

View File

@ -135,7 +135,7 @@ async def open_root_actor(
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True

View File

@ -814,7 +814,7 @@ async def process_messages(
# should use it?
# https://github.com/python-trio/trio/issues/467
log.runtime(
'Entering RPC msg loop:\n'
'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
@ -876,7 +876,7 @@ async def process_messages(
# XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is
# currently in debug mode!
pdb_complete: trio.Event|None = _debug.DebugStatus.repl_release
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
@ -1073,7 +1073,7 @@ async def process_messages(
log.exception(message)
raise RuntimeError(message)
log.transport(
log.runtime(
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'

View File

@ -267,13 +267,10 @@ class Actor:
self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Channel|None = None
self._forkserver_info: tuple|None = None
# track each child/sub-actor in it's locally
# supervising nursery
self._actoruid2nursery: dict[
tuple[str, str], # sub-`Actor.uid`
tuple[str, str],
ActorNursery|None,
] = {}
] = {} # type: ignore # noqa
# when provided, init the registry addresses property from
# input via the validator.
@ -662,18 +659,12 @@ class Actor:
# TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD
#
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if (
pdb_user_uid
and local_nursery
):
entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
)
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
if entry:
proc: trio.Process
_, proc, _ = entry
@ -683,10 +674,10 @@ class Actor:
and poll() is None
):
log.cancel(
'Root actor reports no-more-peers, BUT\n'
'Root actor reports no-more-peers, BUT '
'a DISCONNECTED child still has the debug '
'lock!\n\n'
# f'root uid: {self.uid}\n'
'lock!\n'
f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n'
)
@ -712,8 +703,9 @@ class Actor:
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if (
(db_cs := pdb_lock.get_locking_task_cs())
db_cs
and not db_cs.cancel_called
and uid == pdb_user_uid
):
@ -750,7 +742,7 @@ class Actor:
except KeyError:
log.warning(
'Ignoring invalid IPC ctx msg!\n\n'
f'<= sender: {uid}\n\n'
f'<= sender: {uid}\n'
# XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n'
@ -804,7 +796,7 @@ class Actor:
cid,
# side,
)]
log.debug(
log.runtime(
f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n'
f'cid:{cid}\n'
@ -843,14 +835,10 @@ class Actor:
nsf: NamespacePath,
kwargs: dict,
# determines `Context.side: str`
portal: Portal|None = None,
# IPC channel config
msg_buffer_size: int|None = None,
allow_overruns: bool = False,
load_nsf: bool = False,
ack_timeout: float = 3,
) -> Context:
'''
@ -875,12 +863,10 @@ class Actor:
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._portal = portal
if (
'self' in nsf
or
not load_nsf
or not load_nsf
):
ns, _, func = nsf.partition(':')
else:
@ -888,29 +874,42 @@ class Actor:
# -[ ] but, how to do `self:<Actor.meth>`??
ns, func = nsf.to_tuple()
msg = msgtypes.Start(
log.runtime(
'Sending cmd to\n'
f'peer: {chan.uid} => \n'
'\n'
f'=> {ns}.{func}({kwargs})\n'
)
await chan.send(
msgtypes.Start(
ns=ns,
func=func,
kwargs=kwargs,
uid=self.uid,
cid=cid,
)
log.runtime(
'Sending RPC start msg\n\n'
f'=> peer: {chan.uid}\n'
f' |_ {ns}.{func}({kwargs})\n'
)
await chan.send(msg)
# {'cmd': (
# ns,
# func,
# kwargs,
# self.uid,
# cid,
# )}
# )
# Wait on first response msg and validate; this should be
# immediate.
# first_msg: dict = await ctx._recv_chan.receive()
# functype: str = first_msg.get('functype')
# NOTE wait on first `StartAck` response msg and validate;
# this should be immediate and does not (yet) wait for the
# remote child task to sync via `Context.started()`.
with trio.fail_after(ack_timeout):
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
try:
functype: str = first_msg.functype
except AttributeError:
raise unpack_error(first_msg, chan)
# if 'error' in first_msg:
# raise unpack_error(first_msg, chan)
if functype not in (
'asyncfunc',
@ -918,7 +917,7 @@ class Actor:
'context',
):
raise ValueError(
f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
f'{first_msg} is an invalid response packet?'
)
ctx._remote_func_type = functype
@ -1163,7 +1162,7 @@ class Actor:
# kill any debugger request task to avoid deadlock
# with the root actor in this tree
dbcs = _debug.DebugStatus.req_cs
dbcs = _debug.Lock._debugger_request_cs
if dbcs is not None:
msg += (
'>> Cancelling active debugger request..\n'
@ -1238,9 +1237,9 @@ class Actor:
except KeyError:
# NOTE: during msging race conditions this will often
# emit, some examples:
# - child returns a result before cancel-msg/ctxc-raised
# - child self raises ctxc before parent send request,
# - child errors prior to cancel req.
# - callee returns a result before cancel-msg/ctxc-raised
# - callee self raises ctxc before caller send request,
# - callee errors prior to cancel req.
log.cancel(
'Cancel request invalid, RPC task already completed?\n\n'
f'<= canceller: {requesting_uid}\n\n'
@ -1303,15 +1302,15 @@ class Actor:
flow_info: str = (
f'<= canceller: {requesting_uid}\n'
f'=> ipc-parent: {parent_chan}\n'
f'|_{ctx}\n'
f' |_{ctx}\n'
)
log.runtime(
'Waiting on RPC task to cancel\n\n'
'Waiting on RPC task to cancel\n'
f'{flow_info}'
)
await is_complete.wait()
log.runtime(
f'Sucessfully cancelled RPC task\n\n'
f'Sucessfully cancelled RPC task\n'
f'{flow_info}'
)
return True
@ -1537,8 +1536,8 @@ async def async_main(
'''
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
is_registered: bool = False
try:

View File

@ -30,16 +30,11 @@ if TYPE_CHECKING:
_current_actor: Actor|None = None # type: ignore # noqa
_last_actor_terminated: Actor|None = None
# TODO: mk this a `msgspec.Struct`!
_runtime_vars: dict[str, Any] = {
'_debug_mode': False,
'_is_root': False,
'_root_mailbox': (None, None),
'_registry_addrs': [],
# for `breakpoint()` support
'use_greenback': False,
}
@ -66,7 +61,7 @@ def current_actor(
err_on_no_runtime
and _current_actor is None
):
msg: str = 'No local actor has been initialized yet?\n'
msg: str = 'No local actor has been initialized yet'
from ._exceptions import NoRuntime
if last := last_actor():
@ -79,8 +74,8 @@ def current_actor(
# this process.
else:
msg += (
# 'No last actor found?\n'
'\nDid you forget to call one of,\n'
'No last actor found?\n'
'Did you forget to open one of:\n\n'
'- `tractor.open_root_actor()`\n'
'- `tractor.open_nursery()`\n'
)

View File

@ -377,17 +377,14 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose()
if not self._eoc:
message: str = (
f'Context stream closed by {self._ctx.side!r}\n'
log.cancel(
'Stream closed by self before it received an EoC?\n'
'Setting eoc manually..\n..'
)
self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by self({self._ctx.side})\n'
f'|_{self}\n'
)
log.cancel(
'Stream self-closed before receiving EoC\n\n'
+
message
)
self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same

View File

@ -131,12 +131,7 @@ class ActorNursery:
"main task" besides the runtime.
'''
__runtimeframe__: int = 1 # noqa
loglevel: str = (
loglevel
or self._actor.loglevel
or get_loglevel()
)
loglevel = loglevel or self._actor.loglevel or get_loglevel()
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
@ -214,7 +209,6 @@ class ActorNursery:
the actor is terminated.
'''
__runtimeframe__: int = 1 # noqa
mod_path: str = fn.__module__
if name is None:
@ -263,7 +257,6 @@ class ActorNursery:
directly without any far end graceful ``trio`` cancellation.
'''
__runtimeframe__: int = 1 # noqa
self.cancelled = True
# TODO: impl a repr for spawn more compact

View File

@ -27,6 +27,7 @@ from ._debug import (
pause as pause,
pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler,
MultiActorPdb as MultiActorPdb,
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
post_mortem as post_mortem,

View File

@ -1,177 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Tools for code-object annotation, introspection and mutation
as it pertains to improving the grok-ability of our runtime!
'''
from __future__ import annotations
import inspect
# import msgspec
# from pprint import pformat
from types import (
FrameType,
FunctionType,
MethodType,
# CodeType,
)
from typing import (
# Any,
Callable,
# TYPE_CHECKING,
Type,
)
from tractor.msg import (
pretty_struct,
NamespacePath,
)
# TODO: yeah, i don't love this and we should prolly just
# write a decorator that actually keeps a stupid ref to the func
# obj..
def get_class_from_frame(fr: FrameType) -> (
FunctionType
|MethodType
):
'''
Attempt to get the function (or method) reference
from a given `FrameType`.
Verbatim from an SO:
https://stackoverflow.com/a/2220759
'''
args, _, _, value_dict = inspect.getargvalues(fr)
# we check the first parameter for the frame function is
# named 'self'
if (
len(args)
and
# TODO: other cases for `@classmethod` etc..?)
args[0] == 'self'
):
# in that case, 'self' will be referenced in value_dict
instance: object = value_dict.get('self')
if instance:
# return its class
return getattr(
instance,
'__class__',
None,
)
# return None otherwise
return None
def func_ref_from_frame(
frame: FrameType,
) -> Callable:
func_name: str = frame.f_code.co_name
try:
return frame.f_globals[func_name]
except KeyError:
cls: Type|None = get_class_from_frame(frame)
if cls:
return getattr(
cls,
func_name,
)
# TODO: move all this into new `.devx._code`!
# -[ ] prolly create a `@runtime_api` dec?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
class CallerInfo(pretty_struct.Struct):
rt_fi: inspect.FrameInfo
call_frame: FrameType
@property
def api_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.rt_fi.frame)
@property
def api_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
@property
def caller_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.call_frame)
@property
def caller_nsp(self) -> NamespacePath|None:
func: FunctionType = self.caller_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
def find_caller_info(
dunder_var: str = '__runtimeframe__',
iframes:int = 1,
check_frame_depth: bool = True,
) -> CallerInfo|None:
'''
Scan up the callstack for a frame with a `dunder_var: str` variable
and return the `iframes` frames above it.
By default we scan for a `__runtimeframe__` scope var which
denotes a `tractor` API above which (one frame up) is "user
app code" which "called into" the `tractor` method or func.
TODO: ex with `Portal.open_context()`
'''
# TODO: use this instead?
# https://docs.python.org/3/library/inspect.html#inspect.getouterframes
frames: list[inspect.FrameInfo] = inspect.stack()
for fi in frames:
assert (
fi.function
==
fi.frame.f_code.co_name
)
this_frame: FrameType = fi.frame
dunder_val: int|None = this_frame.f_locals.get(dunder_var)
if dunder_val:
go_up_iframes: int = (
dunder_val # could be 0 or `True` i guess?
or
iframes
)
rt_frame: FrameType = fi.frame
call_frame = rt_frame
for i in range(go_up_iframes):
call_frame = call_frame.f_back
return CallerInfo(
rt_fi=fi,
call_frame=call_frame,
)
return None

File diff suppressed because it is too large Load Diff

View File

@ -23,31 +23,12 @@ into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors.
'''
from __future__ import annotations
import multiprocessing as mp
from signal import (
signal,
SIGUSR1,
)
import traceback
from typing import TYPE_CHECKING
import trio
from tractor import (
_state,
log as logmod,
)
log = logmod.get_logger(__name__)
if TYPE_CHECKING:
from tractor._spawn import ProcessType
from tractor import (
Actor,
ActorNursery,
)
@trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None:
@ -60,15 +41,9 @@ def dump_task_tree() -> None:
recurse_child_tasks=True
)
)
log = get_console_log(
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor()
log = get_console_log('cancel')
log.pdb(
f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n'
f'Dumping `stackscope` tree:\n\n'
f'{tree_str}\n'
)
# import logging
@ -81,13 +56,8 @@ def dump_task_tree() -> None:
# ).exception("Error printing task tree")
def signal_handler(
sig: int,
frame: object,
relay_to_subs: bool = True,
) -> None:
def signal_handler(sig: int, frame: object) -> None:
import traceback
try:
trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree)
@ -95,26 +65,6 @@ def signal_handler(
# not in async context -- print a normal traceback
traceback.print_stack()
if not relay_to_subs:
return
an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType
subactor: Actor
for subactor, subproc, _ in an._children.values():
log.pdb(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n'
f' |_{subproc}\n'
)
if isinstance(subproc, trio.Process):
subproc.send_signal(sig)
elif isinstance(subproc, mp.Process):
subproc._send_signal(sig)
def enable_stack_on_sig(
@ -132,6 +82,3 @@ def enable_stack_on_sig(
# NOTE: not the above can be triggered from
# a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
#
# for example if you were looking to trace a `pytest` run
# kill -SIGUSR1 @$(pgrep -f 'pytest')

View File

@ -33,29 +33,25 @@ from __future__ import annotations
from contextlib import (
contextmanager as cm,
)
from contextvars import (
ContextVar,
Token,
)
# from contextvars import (
# ContextVar,
# Token,
# )
import textwrap
from typing import (
Any,
Callable,
Type,
TYPE_CHECKING,
Union,
)
from types import ModuleType
import msgspec
from msgspec import (
msgpack,
Raw,
from msgspec import msgpack
from trio.lowlevel import (
RunVar,
RunVarToken,
)
# from trio.lowlevel import (
# RunVar,
# RunVarToken,
# )
# TODO: see notes below from @mikenerone..
# from tricycle import TreeVar
@ -66,9 +62,6 @@ from tractor.msg.types import (
)
from tractor.log import get_logger
if TYPE_CHECKING:
from tractor._context import Context
log = get_logger(__name__)
# TODO: overall IPC msg-spec features (i.e. in this mod)!
@ -164,6 +157,24 @@ class MsgCodec(Struct):
lib: ModuleType = msgspec
# TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property
@property
@ -199,25 +210,7 @@ class MsgCodec(Struct):
# https://jcristharif.com/msgspec/usage.html#typed-decoding
return self._dec.decode(msg)
# TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
#
# -[ ] do we still want to try and support the sub-decoder with
# TODO: do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives
# future grief?
#
@ -436,9 +429,6 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
#
_def_tractor_codec: MsgCodec = mk_codec(
ipc_pld_spec=Any,
# TODO: use this for debug mode locking prot?
# ipc_pld_spec=Raw,
)
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing
@ -472,9 +462,11 @@ _def_tractor_codec: MsgCodec = mk_codec(
# TODO: STOP USING THIS, since it's basically a global and won't
# allow sub-IPC-ctxs to limit the msg-spec however desired..
# _ctxvar_MsgCodec: MsgCodec = RunVar(
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
_ctxvar_MsgCodec: MsgCodec = RunVar(
'msgspec_codec',
# TODO: move this to our new `Msg`-spec!
# default=_def_msgspec_codec,
default=_def_tractor_codec,
)
@ -483,36 +475,23 @@ _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
def apply_codec(
codec: MsgCodec,
ctx: Context|None = None,
) -> MsgCodec:
'''
Dynamically apply a `MsgCodec` to the current task's runtime
context such that all (of a certain class of payload
containing i.e. `MsgType.pld: PayloadT`) IPC msgs are
processed with it for that task.
Uses a `contextvars.ContextVar` to ensure the scope of any
codec setting matches the current `Context` or
`._rpc.process_messages()` feeder task's prior setting without
mutating any surrounding scope.
When a `ctx` is supplied, only mod its `Context.pld_codec`.
Dynamically apply a `MsgCodec` to the current task's
runtime context such that all IPC msgs are processed
with it for that task.
Uses a `tricycle.TreeVar` to ensure the scope of the codec
matches the `@cm` block and DOES NOT change to the original
(default) value in new tasks (as it does for `ContextVar`).
See the docs:
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
'''
__tracebackhide__: bool = True
if ctx is not None:
var: ContextVar = ctx._var_pld_codec
else:
# use IPC channel-connection "global" codec
var: ContextVar = _ctxvar_MsgCodec
orig: MsgCodec = var.get()
orig: MsgCodec = _ctxvar_MsgCodec.get()
assert orig is not codec
if codec.pld_spec is None:
breakpoint()
@ -521,25 +500,22 @@ def apply_codec(
'Applying new msg-spec codec\n\n'
f'{codec}\n'
)
token: Token = var.set(codec)
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
# ?TODO? for TreeVar approach which copies from the
# cancel-scope of the prior value, NOT the prior task
# See the docs:
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
# ^- see docs for @cm `.being()` API
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# try:
# with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get()
# assert new is codec
# yield codec
try:
yield var.get()
yield _ctxvar_MsgCodec.get()
finally:
var.reset(token)
_ctxvar_MsgCodec.reset(token)
assert var.get() is orig
assert _ctxvar_MsgCodec.get() is orig
log.info(
'Reverted to last msg-spec codec\n\n'
f'{orig}\n'

View File

@ -76,11 +76,9 @@ class NamespacePath(str):
return self._ref
@staticmethod
def _mk_fqnp(
ref: type|object,
) -> tuple[str, str]:
def _mk_fqnp(ref: type | object) -> tuple[str, str]:
'''
Generate a minial `str` pair which describes a python
Generate a minial ``str`` pair which describes a python
object's namespace path and object/type name.
In more precise terms something like:
@ -89,9 +87,10 @@ class NamespacePath(str):
of THIS type XD
'''
if isfunction(ref):
if (
isfunction(ref)
):
name: str = getattr(ref, '__name__')
mod_name: str = ref.__module__
elif ismethod(ref):
# build out the path manually i guess..?
@ -100,19 +99,15 @@ class NamespacePath(str):
type(ref.__self__).__name__,
ref.__func__.__name__,
])
mod_name: str = ref.__self__.__module__
else: # object or other?
# isinstance(ref, object)
# and not isfunction(ref)
name: str = type(ref).__name__
mod_name: str = ref.__module__
# TODO: return static value direactly?
#
# fully qualified namespace path, tuple.
fqnp: tuple[str, str] = (
mod_name,
ref.__module__,
name,
)
return fqnp
@ -120,7 +115,7 @@ class NamespacePath(str):
@classmethod
def from_ref(
cls,
ref: type|object,
ref: type | object,
) -> NamespacePath: