Compare commits

...

15 Commits

Author SHA1 Message Date
Tyler Goodlet 9b4f580470 Go back to `ContextVar` for codec mgmt
Turns out we do want per-task inheritance particularly if there's to be
per `Context` dynamic mutation of the spec; we don't want mutation in
some task to affect any parent/global setting.

Turns out since we use a common "feeder task" in the rpc loop, we need to
offer a per `Context` payload decoder sys anyway in order to enable
per-task controls for inter-actor multi-task-ctx scenarios.
2024-04-18 16:24:59 -04:00
Tyler Goodlet d51be2a36a Proto in new `Context` refinements
As per some newly added features and APIs:

- pass `portal: Portal` to `Actor.start_remote_task()` from
  `open_context_from_portal()` marking `Portal.open_context()` as
  always being the "parent" task side.

- add caller tracing via `.devx._code.CallerInfo/.find_caller_info()`
  called in `mk_context()` and (for now) a `__runtimeframe__: int = 2`
  inside `open_context_from_portal()` such that any enter-er of
  `Portal.open_context()` will be reported.

- pass in a new `._caller_info` attr which is used in 2 new meths:
  - `.repr_caller: str` for showing the name of the app-code-func.
  - `.repr_api: str` for showing the API ep, which for now we just
    hardcode to `Portal.open_context()` since ow its gonna show the mod
    func name `open_context_from_portal()`.
  - use those new props ^ in the `._deliver_msg()` flow body log msg
    content for much clearer msg-flow tracing Bo

- add `Context._cancel_on_msgerr: bool` to toggle whether
  a delivered `MsgTypeError` should trigger a `._scope.cancel()` call.
  - also (temporarily) add separate `.cancel()` emissions for both cases
    as i work through hacking out the maybe `MsgType.pld: Raw` support.
2024-04-18 16:22:30 -04:00
Tyler Goodlet 3018187228 Tweak `current_actor()` failure msg 2024-04-18 15:41:06 -04:00
Tyler Goodlet e5f0b450cf Add some `bytes` annots 2024-04-18 15:40:26 -04:00
Tyler Goodlet 4aa24f8518 TOSQUASH 77a15eb use `DebugStatus` in `._rpc` 2024-04-18 15:18:29 -04:00
Tyler Goodlet d2f6428e46 Annotate nursery and portal methods for `CallerInfo` scanning 2024-04-18 15:17:50 -04:00
Tyler Goodlet 5439060cd3 Start a `devx._code` mod
Starting with a little sub-sys for tracing caller frames by marking them
with a dunder var (`__runtimeframe__` by default) and then scanning for
that frame such that code that is *calling* our APIs can be reported
easily in logging / tracing output.

New APIs:
- `find_caller_info()` which does the scan and delivers a,
- `CallerInfo` which (attempts) to expose both the runtime frame-info
  and frame of the caller func along with `NamespacePath` properties.

Probably going to re-implement the dunder var bit as a decorator later
so we can bind in the literal func-object ref instead of trying to look
it up with `get_class_from_frame()`, since it's kinda hacky/non-general
and def doesn't work for closure funcs..
2024-04-18 15:12:32 -04:00
Tyler Goodlet 7372404d76 `NamespacePath._mk_fqnp()` handle `__mod__` for methods
Need to use `__self__.__mod__` in the method case i guess..
2024-04-18 15:10:27 -04:00
Tyler Goodlet 77a15ebf19 Use `DebugStatus` around subactor lock requests
Breaks out all the (sub)actor local conc primitives from `Lock` (which
is now only used in and by the root actor) such that there's an explicit
distinction between a task that's "consuming" the `Lock` (remotely) vs.
the root-side service tasks which do the actual acquire on behalf of the
requesters.

`DebugStatus` changeover deats:
------ - ------
- move all the actor-local vars over `DebugStatus` including:
  - move `_trio_handler` and `_orig_sigint_handler`
  - `local_task_in_debug` now `repl_task`
  - `_debugger_request_cs` now `req_cs`
  - `local_pdb_complete` now `repl_release`
- drop all ^ fields from `Lock.repr()` obvi..
- move over the `.[un]shield_sigint()` and
  `.is_main_trio_thread()` methods.
- add some new attrs/meths:
  - `DebugStatus.repl` for the currently running `Pdb` in-actor
    singleton.
  - `.repr()` for pprint of state (like `Lock`).
- Note: that even when a root-actor task is in REPL, the `DebugStatus`
  is still used for certain actor-local state mgmt, such as SIGINT
  handler shielding.
- obvi change all lock-requester code bits to now use a `DebugStatus` in
  their local actor-state instead of `Lock`, i.e. change usage from
  `Lock` in `._runtime` and `._root`.
- use new `Lock.get_locking_task_cs()` API in when checking for
  sub-in-debug from `._runtime.Actor._stream_handler()`.

Unrelated to topic-at-hand tweaks:
------ - ------
- drop the commented bits about hiding `@[a]cm` stack frames from
  `_debug.pause()` and simplify to only one block with the `shield`
  passthrough since we already solved the issue with cancel-scopes using
  `@pdbp.hideframe` B)
  - this includes all the extra logging about the extra frame for the
    user (good thing i put in that wasted effort back then eh..)
- put the `try/except BaseException` with `log.exception()` around the
  whole of `._pause()` to ensure we don't miss in-func errors which can
  cause hangs..
- allow passing in `portal: Portal` to
  `Actor.start_remote_task()` such that `Portal` task spawning methods
  are always denoted correctly in terms of `Context.side`.
- lotsa logging tweaks, decreasing a bit of noise from `.runtime()`s.
2024-04-18 13:53:08 -04:00
Tyler Goodlet d0e7610073 The src error to `_raise_from_no_key_in_msg()` is always an attr-error now! 2024-04-17 23:19:31 -04:00
Tyler Goodlet a73b24cf4a First draft, sub-msg-spec for debugger `Lock` sys
Since it's totes possible to have a spec applied that won't permit
`str`s, might as well formalize a small msg set for subactors to request
the tree-wide TTY `Lock`.

BTW, I'm prolly not going into every single change here in this first
WIP since there's still a variety of broken stuff mostly to do with
races on the codec apply being done in a `trio.lowleve.RunVar`; it
should be re-done with a `ContextVar` such that each task does NOT
mutate the global setting..

New msg set and usage is simply:
- `LockStatus` which is the reponse msg delivered from `lock_tty_for_child()`
- `LockRelease` a one-off request msg from the subactor to drop the
  `Lock` from a `MsgStream.send()`.
- use these msgs throughout the root and sub sides of the locking
  ctx funcs: `lock_tty_for_child()` & `wait_for_parent_stdin_hijack()`

The codec is now applied in both the root and sub `Lock` request tasks:
- for root inside `lock_tty_for_child()` before the `.started()`.
- for subs, inside `wait_for_parent_stdin_hijack()` since we only want
  to affect the codec *for the locking task*.
  - (hence the need for ctx-var as mentioned above but currently this
    can cause races which will break against other app tasks competing
    for the codec setting).
- add a `apply_debug_codec()` helper for use in both cases.
- add more detailed logging to both the root and sub side of `Lock`
  requesting funcs including requiring that the sub-side task "uid" (a
  `tuple[str, int]` = (trio.Task.name, id(trio.Task)` be provided (more
  on this later).

A main issue discovered while proto-testing all this was the ability of
a sub to "double lock" (leading to self-deadlock) via an error in
`wait_for_parent_stdin_hijack()` which, for ex., can happen in debug
mode via crash handling of a `MsgTypeError` received from the root
during a codec applied msg-spec race! Originally I was attempting to
solve this by making the SIGINT override handler more resilient but this
case is somewhat impossible to detect by an external root task other
then checking for duplicate ownership via the new `subactor_task_uid`.
=> SO NOW, we always stick the current task uid in the
   `Lock._blocked: set` and raise an rte on a double request by the same
   remote task.

Included is a variety of small refinements:
- finally figured out how to mark a variety of `.__exit__()` frames with
  `pdbp.hideframe()` to actually hide them B)
- add cls methods around managing `Lock._locking_task_cs` from root only.
- re-org all the `Lock` attrs into those only used in root vs. subactors
  and proto-prep a new `DebugStatus` actor-singleton to be used in subs.
- add a `Lock.repr()` to contextually print the current conc primitives.
- rename our `Pdb`-subtype to `PdbREPL`.
- rigor out the SIGINT handler a bit, originally to try and hack-solve
  the double-lock issue mentioned above, but now just with better
  logging and logic for most (all?) possible hang cases that should be
  hang-recoverable after enough ctrl-c mashing by the user.. well
  hopefully:
  - using `Lock.repr()` for both root and sub cases.
  - lots more `log.warn()`s and handler reversions on stale lock or cs
    detection.
- factor `._pause()` impl a little better moving the actual repl entry
  to a new `_enter_repl_sync()` (originally for easier wrapping in the
  sub case with `apply_codec()`).
2024-04-16 13:25:19 -04:00
Tyler Goodlet 5dfff3f75a Tweak a couple more log message fmts 2024-04-15 15:20:00 -04:00
Tyler Goodlet d4155396bf Relay `SIGUSR1` to subactors for `stackscope` tracing
Since obvi we don't want to just only see the trace in the root most of
the time ;)

Currently the sig keeps firing twice in the root though, and i'm not
sure why yet..
2024-04-14 19:52:44 -04:00
Tyler Goodlet 3869e91b19 More msg-spec tests tidying
- Drop `test_msg_spec_xor_pld_spec()` since we no longer support
  `ipc_msg_spec` arg to `mk_codec()`.
- Expect `MsgTypeError`s around `.open_context()` calls when
  `add_codec_hooks == False`.
- toss in some `.pause()` points in the subactor ctx body whilst hacking
  out a `.pld` protocol for debug mode TTY locking.
2024-04-14 19:50:09 -04:00
Tyler Goodlet 829dfa7520 Add defaul rtv for `use_greeback: bool = False` 2024-04-14 19:41:29 -04:00
17 changed files with 1444 additions and 559 deletions

View File

@ -14,19 +14,20 @@ from typing import (
from contextvars import ( from contextvars import (
Context, Context,
) )
# from inspect import Parameter
from msgspec import ( from msgspec import (
structs, structs,
msgpack, msgpack,
# defstruct,
Struct, Struct,
ValidationError, ValidationError,
) )
import pytest import pytest
import tractor import tractor
from tractor import _state from tractor import (
_state,
MsgTypeError,
)
from tractor.msg import ( from tractor.msg import (
_codec, _codec,
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
@ -47,21 +48,6 @@ from tractor.msg.types import (
import trio import trio
def test_msg_spec_xor_pld_spec():
'''
If the `.msg.types.Msg`-set is overridden, we
can't also support a `Msg.pld` spec.
'''
# apply custom hooks and set a `Decoder` which only
# loads `NamespacePath` types.
with pytest.raises(RuntimeError):
mk_codec(
ipc_msg_spec=Any,
ipc_pld_spec=NamespacePath,
)
def mk_custom_codec( def mk_custom_codec(
pld_spec: Union[Type]|Any, pld_spec: Union[Type]|Any,
add_hooks: bool, add_hooks: bool,
@ -134,7 +120,9 @@ def mk_custom_codec(
f'{uid}\n' f'{uid}\n'
'FAILED DECODE\n' 'FAILED DECODE\n'
f'type-> {obj_type}\n' f'type-> {obj_type}\n'
f'obj-arg-> `{obj}`: {type(obj)}\n' f'obj-arg-> `{obj}`: {type(obj)}\n\n'
f'current codec:\n'
f'{current_codec()}\n'
) )
# TODO: figure out the ignore subsys for this! # TODO: figure out the ignore subsys for this!
# -[ ] option whether to defense-relay backc the msg # -[ ] option whether to defense-relay backc the msg
@ -409,7 +397,9 @@ async def send_back_values(
pld_spec=ipc_pld_spec, pld_spec=ipc_pld_spec,
add_hooks=add_hooks, add_hooks=add_hooks,
) )
with apply_codec(nsp_codec) as codec: with (
apply_codec(nsp_codec) as codec,
):
chk_codec_applied( chk_codec_applied(
expect_codec=nsp_codec, expect_codec=nsp_codec,
enter_value=codec, enter_value=codec,
@ -459,7 +449,7 @@ async def send_back_values(
# XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL # XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL
# `str` handling! or special debug mode IPC # `str` handling! or special debug mode IPC
# msgs! # msgs!
# await tractor.pause() await tractor.pause()
raise RuntimeError( raise RuntimeError(
f'NOT-EXPECTED able to roundtrip value given spec:\n' f'NOT-EXPECTED able to roundtrip value given spec:\n'
@ -470,7 +460,8 @@ async def send_back_values(
break # move on to streaming block.. break # move on to streaming block..
except tractor.MsgTypeError: except tractor.MsgTypeError:
# await tractor.pause() await tractor.pause()
if expect_send: if expect_send:
raise RuntimeError( raise RuntimeError(
f'EXPECTED to `.started()` value given spec:\n' f'EXPECTED to `.started()` value given spec:\n'
@ -652,12 +643,42 @@ def test_codec_hooks_mod(
pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec) pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
# XXX should raise an mte (`MsgTypeError`)
# when `add_codec_hooks == False` bc the input
# `expect_ipc_send` kwarg has a nsp which can't be
# serialized!
#
# TODO:can we ensure this happens from the
# `Return`-side (aka the sub) as well?
if not add_codec_hooks:
try:
async with p.open_context(
send_back_values,
expect_debug=debug_mode,
pld_spec_type_strs=pld_spec_type_strs,
add_hooks=add_codec_hooks,
started_msg_bytes=nsp_codec.encode(expected_started),
# XXX NOTE bc we send a `NamespacePath` in this kwarg
expect_ipc_send=expect_ipc_send,
) as (ctx, first):
pytest.fail('ctx should fail to open without custom enc_hook!?')
# this test passes bc we can go no further!
except MsgTypeError:
# teardown nursery
await p.cancel_actor()
return
# TODO: send the original nsp here and # TODO: send the original nsp here and
# test with `limit_msg_spec()` above? # test with `limit_msg_spec()` above?
# await tractor.pause() # await tractor.pause()
print('PARENT opening IPC ctx!\n') print('PARENT opening IPC ctx!\n')
async with ( async with (
# XXX should raise an mte (`MsgTypeError`)
# when `add_codec_hooks == False`..
p.open_context( p.open_context(
send_back_values, send_back_values,
expect_debug=debug_mode, expect_debug=debug_mode,

View File

@ -26,6 +26,7 @@ disjoint, parallel executing tasks in separate actors.
from __future__ import annotations from __future__ import annotations
from collections import deque from collections import deque
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from contextvars import ContextVar
from dataclasses import ( from dataclasses import (
dataclass, dataclass,
field, field,
@ -56,6 +57,7 @@ from ._exceptions import (
) )
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
_codec,
Error, Error,
MsgType, MsgType,
MsgCodec, MsgCodec,
@ -80,6 +82,9 @@ if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
from ._runtime import Actor from ._runtime import Actor
from ._ipc import MsgTransport from ._ipc import MsgTransport
from .devx._code import (
CallerInfo,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -499,6 +504,18 @@ class Context:
_started_called: bool = False _started_called: bool = False
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None _stream: MsgStream|None = None
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
'pld_codec',
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
)
@property
def pld_codec(self) -> MsgCodec|None:
return self._pld_codec_var.get()
# caller of `Portal.open_context()` for
# logging purposes mostly
_caller_info: CallerInfo|None = None
# overrun handling machinery # overrun handling machinery
# NOTE: none of this provides "backpressure" to the remote # NOTE: none of this provides "backpressure" to the remote
@ -525,6 +542,7 @@ class Context:
# TODO: figure out how we can enforce this without losing our minds.. # TODO: figure out how we can enforce this without losing our minds..
_strict_started: bool = False _strict_started: bool = False
_cancel_on_msgerr: bool = True
def __str__(self) -> str: def __str__(self) -> str:
ds: str = '=' ds: str = '='
@ -857,6 +875,7 @@ class Context:
# TODO: never do this right? # TODO: never do this right?
# if self._remote_error: # if self._remote_error:
# return # return
peer_side: str = self.peer_side(self.side)
# XXX: denote and set the remote side's error so that # XXX: denote and set the remote side's error so that
# after we cancel whatever task is the opener of this # after we cancel whatever task is the opener of this
@ -864,14 +883,15 @@ class Context:
# appropriately. # appropriately.
log.runtime( log.runtime(
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= remote ctx uid: {self.chan.uid}\n' f'<= {peer_side!r}: {self.chan.uid}\n'
f'=>{error}' f'=> {self.side!r}\n\n'
f'{error}'
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
# self-cancel (ack) or, # self-cancel (ack) or,
# peer propagated remote cancellation. # peer propagated remote cancellation.
msgtyperr: bool = False msgerr: bool = False
if isinstance(error, ContextCancelled): if isinstance(error, ContextCancelled):
whom: str = ( whom: str = (
@ -884,7 +904,7 @@ class Context:
) )
elif isinstance(error, MsgTypeError): elif isinstance(error, MsgTypeError):
msgtyperr = True msgerr = True
peer_side: str = self.peer_side(self.side) peer_side: str = self.peer_side(self.side)
log.error( log.error(
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n' f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
@ -935,13 +955,24 @@ class Context:
and not self._is_self_cancelled() and not self._is_self_cancelled()
and not cs.cancel_called and not cs.cancel_called
and not cs.cancelled_caught and not cs.cancelled_caught
and not msgtyperr and (
msgerr
and
# NOTE: allow user to config not cancelling the
# local scope on `MsgTypeError`s
self._cancel_on_msgerr
)
): ):
# TODO: it'd sure be handy to inject our own # TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
log.cancel('Cancelling local `.open_context()` scope!')
self._scope.cancel() self._scope.cancel()
else:
log.cancel('NOT cancelling local `.open_context()` scope!')
# TODO: maybe we should also call `._res_scope.cancel()` if it # TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs? # exists to support cancelling any drain loop hangs?
@ -966,9 +997,7 @@ class Context:
dmaddr = dst_maddr dmaddr = dst_maddr
@property @property
def repr_rpc( def repr_rpc(self) -> str:
self,
) -> str:
# TODO: how to show the transport interchange fmt? # TODO: how to show the transport interchange fmt?
# codec: str = self.chan.transport.codec_key # codec: str = self.chan.transport.codec_key
outcome_str: str = self.repr_outcome( outcome_str: str = self.repr_outcome(
@ -980,6 +1009,27 @@ class Context:
f'{self._nsf}() -> {outcome_str}:' f'{self._nsf}() -> {outcome_str}:'
) )
@property
def repr_caller(self) -> str:
ci: CallerInfo|None = self._caller_info
if ci:
return (
f'{ci.caller_nsp}()'
# f'|_api: {ci.api_nsp}'
)
return '<UNKNOWN caller-frame>'
@property
def repr_api(self) -> str:
# ci: CallerInfo|None = self._caller_info
# if ci:
# return (
# f'{ci.api_nsp}()\n'
# )
return 'Portal.open_context()'
async def cancel( async def cancel(
self, self,
timeout: float = 0.616, timeout: float = 0.616,
@ -1184,8 +1234,9 @@ class Context:
) )
# NOTE: in one way streaming this only happens on the # NOTE: in one way streaming this only happens on the
# caller side inside `Actor.start_remote_task()` so if you try # parent-ctx-task side (on the side that calls
# to send a stop from the caller to the callee in the # `Actor.start_remote_task()`) so if you try to send
# a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error # single-direction-stream case you'll get a lookup error
# currently. # currently.
ctx: Context = actor.get_context( ctx: Context = actor.get_context(
@ -1850,6 +1901,19 @@ class Context:
send_chan: trio.MemorySendChannel = self._send_chan send_chan: trio.MemorySendChannel = self._send_chan
nsf: NamespacePath = self._nsf nsf: NamespacePath = self._nsf
side: str = self.side
if side == 'child':
assert not self._portal
peer_side: str = self.peer_side(side)
flow_body: str = (
f'<= peer {peer_side!r}: {from_uid}\n'
f' |_<{nsf}()>\n\n'
f'=> {side!r}: {self._task}\n'
f' |_<{self.repr_api} @ {self.repr_caller}>\n\n'
)
re: Exception|None re: Exception|None
if re := unpack_error( if re := unpack_error(
msg, msg,
@ -1860,18 +1924,10 @@ class Context:
else: else:
log_meth = log.runtime log_meth = log.runtime
side: str = self.side
peer_side: str = self.peer_side(side)
log_meth( log_meth(
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n' f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
f'<= peer {peer_side!r}: {from_uid}\n' f'{flow_body}'
f' |_ {nsf}()\n\n'
f'=> {side!r} cid: {cid}\n'
f' |_{self._task}\n\n'
f'{pformat(re)}\n' f'{pformat(re)}\n'
) )
@ -1884,30 +1940,27 @@ class Context:
# or `RemoteActorError`). # or `RemoteActorError`).
self._maybe_cancel_and_set_remote_error(re) self._maybe_cancel_and_set_remote_error(re)
# XXX only case where returning early is fine! # TODO: expose as mod func instead!
structfmt = pretty_struct.Struct.pformat structfmt = pretty_struct.Struct.pformat
if self._in_overrun: if self._in_overrun:
log.warning( log.warning(
f'Queueing OVERRUN msg on caller task:\n' f'Queueing OVERRUN msg on caller task:\n\n'
f'<= peer: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> cid: {cid}\n' f'{flow_body}'
f' |_{self._task}\n\n'
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
self._overflow_q.append(msg) self._overflow_q.append(msg)
# XXX NOTE XXX
# overrun is the ONLY case where returning early is fine!
return False return False
try: try:
log.runtime( log.runtime(
f'Delivering msg from IPC ctx:\n' f'Delivering msg from IPC ctx:\n\n'
f'<= {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> {self._task}\n' f'{flow_body}'
f' |_cid={self.cid}\n\n'
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
@ -1939,6 +1992,7 @@ class Context:
f'cid: {self.cid}\n' f'cid: {self.cid}\n'
'Failed to deliver msg:\n' 'Failed to deliver msg:\n'
f'send_chan: {send_chan}\n\n' f'send_chan: {send_chan}\n\n'
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
return False return False
@ -2092,6 +2146,12 @@ async def open_context_from_portal(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack
# introspection where we report the caller code in logging
# and error message content.
# NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa
# conduct target func method structural checks # conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and ( if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False) getattr(func, '_tractor_contex_function', False)
@ -2119,6 +2179,8 @@ async def open_context_from_portal(
nsf=nsf, nsf=nsf,
kwargs=kwargs, kwargs=kwargs,
portal=portal,
# NOTE: it's imporant to expose this since you might # NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does # get the case where the parent who opened the context does
# not open a stream until after some slow startup/init # not open a stream until after some slow startup/init
@ -2129,13 +2191,17 @@ async def open_context_from_portal(
# place.. # place..
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
# ASAP, so that `Context.side: str` can be determined for
# logging / tracing / debug!
ctx._portal: Portal = portal
assert ctx._remote_func_type == 'context' assert ctx._remote_func_type == 'context'
msg: Started = await ctx._recv_chan.receive() assert ctx._caller_info
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
# NOT actually cancel the below line!
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
msg: Started = await ctx._recv_chan.receive()
try: try:
# the "first" value here is delivered by the callee's # the "first" value here is delivered by the callee's
# ``Context.started()`` call. # ``Context.started()`` call.
@ -2145,6 +2211,7 @@ async def open_context_from_portal(
# except KeyError as src_error: # except KeyError as src_error:
except AttributeError as src_error: except AttributeError as src_error:
log.exception('Raising from unexpected msg!\n')
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=ctx, ctx=ctx,
msg=msg, msg=msg,
@ -2570,7 +2637,6 @@ async def open_context_from_portal(
None, None,
) )
def mk_context( def mk_context(
chan: Channel, chan: Channel,
cid: str, cid: str,
@ -2592,6 +2658,10 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high!
from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info()
ctx = Context( ctx = Context(
chan=chan, chan=chan,
cid=cid, cid=cid,
@ -2600,6 +2670,7 @@ def mk_context(
_recv_chan=recv_chan, _recv_chan=recv_chan,
_nsf=nsf, _nsf=nsf,
_task=trio.lowlevel.current_task(), _task=trio.lowlevel.current_task(),
_caller_info=caller_info,
**kwargs, **kwargs,
) )
# TODO: we can drop the old placeholder yah? # TODO: we can drop the old placeholder yah?
@ -2610,7 +2681,11 @@ def mk_context(
def context(func: Callable) -> Callable: def context(func: Callable) -> Callable:
''' '''
Mark an async function as a streaming routine with ``@context``. Mark an (async) function as an SC-supervised, inter-`Actor`,
child-`trio.Task`, IPC endpoint otherwise known more
colloquially as a (RPC) "context".
Functions annotated the fundamental IPC endpoint type offered by `tractor`.
''' '''
# TODO: apply whatever solution ``mypy`` ends up picking for this: # TODO: apply whatever solution ``mypy`` ends up picking for this:

View File

@ -935,7 +935,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
def _raise_from_no_key_in_msg( def _raise_from_no_key_in_msg(
ctx: Context, ctx: Context,
msg: MsgType, msg: MsgType,
src_err: KeyError, src_err: AttributeError,
log: StackLevelAdapter, # caller specific `log` obj log: StackLevelAdapter, # caller specific `log` obj
expect_msg: str = Yield, expect_msg: str = Yield,
@ -994,7 +994,7 @@ def _raise_from_no_key_in_msg(
ctx.chan, ctx.chan,
hide_tb=hide_tb, hide_tb=hide_tb,
) from None ) from src_err
# `MsgStream` termination msg. # `MsgStream` termination msg.
# TODO: does it make more sense to pack # TODO: does it make more sense to pack

View File

@ -314,8 +314,7 @@ class MsgpackTCPStream(MsgTransport):
while True: while True:
try: try:
header = await self.recv_stream.receive_exactly(4) header: bytes = await self.recv_stream.receive_exactly(4)
except ( except (
ValueError, ValueError,
ConnectionResetError, ConnectionResetError,
@ -337,8 +336,7 @@ class MsgpackTCPStream(MsgTransport):
size, = struct.unpack("<I", header) size, = struct.unpack("<I", header)
log.transport(f'received header {size}') # type: ignore log.transport(f'received header {size}') # type: ignore
msg_bytes: bytes = await self.recv_stream.receive_exactly(size)
msg_bytes = await self.recv_stream.receive_exactly(size)
log.transport(f"received {msg_bytes}") # type: ignore log.transport(f"received {msg_bytes}") # type: ignore
try: try:

View File

@ -161,17 +161,18 @@ class Portal:
self._expect_result = await self.actor.start_remote_task( self._expect_result = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=NamespacePath(f'{ns}:{func}'), nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs kwargs=kwargs,
portal=self,
) )
async def _return_once( async def _return_once(
self, self,
ctx: Context, ctx: Context,
) -> dict[str, Any]: ) -> Return:
assert ctx._remote_func_type == 'asyncfunc' # single response assert ctx._remote_func_type == 'asyncfunc' # single response
msg: dict = await ctx._recv_chan.receive() msg: Return = await ctx._recv_chan.receive()
return msg return msg
async def result(self) -> Any: async def result(self) -> Any:
@ -247,6 +248,8 @@ class Portal:
purpose. purpose.
''' '''
__runtimeframe__: int = 1 # noqa
chan: Channel = self.channel chan: Channel = self.channel
if not chan.connected(): if not chan.connected():
log.runtime( log.runtime(
@ -324,16 +327,18 @@ class Portal:
internals! internals!
''' '''
__runtimeframe__: int = 1 # noqa
nsf = NamespacePath( nsf = NamespacePath(
f'{namespace_path}:{function_name}' f'{namespace_path}:{function_name}'
) )
ctx = await self.actor.start_remote_task( ctx: Context = await self.actor.start_remote_task(
chan=self.channel, chan=self.channel,
nsf=nsf, nsf=nsf,
kwargs=kwargs, kwargs=kwargs,
portal=self,
) )
ctx._portal = self ctx._portal: Portal = self
msg = await self._return_once(ctx) msg: Return = await self._return_once(ctx)
return _unwrap_msg( return _unwrap_msg(
msg, msg,
self.channel, self.channel,
@ -384,6 +389,7 @@ class Portal:
self.channel, self.channel,
nsf=nsf, nsf=nsf,
kwargs=kwargs, kwargs=kwargs,
portal=self,
) )
ctx._portal = self ctx._portal = self
return _unwrap_msg( return _unwrap_msg(
@ -398,6 +404,14 @@ class Portal:
**kwargs, **kwargs,
) -> AsyncGenerator[MsgStream, None]: ) -> AsyncGenerator[MsgStream, None]:
'''
Legacy one-way streaming API.
TODO: re-impl on top `Portal.open_context()` + an async gen
around `Context.open_stream()`.
'''
__runtimeframe__: int = 1 # noqa
if not inspect.isasyncgenfunction(async_gen_func): if not inspect.isasyncgenfunction(async_gen_func):
if not ( if not (
@ -411,6 +425,7 @@ class Portal:
self.channel, self.channel,
nsf=NamespacePath.from_ref(async_gen_func), nsf=NamespacePath.from_ref(async_gen_func),
kwargs=kwargs, kwargs=kwargs,
portal=self,
) )
ctx._portal = self ctx._portal = self

View File

@ -135,7 +135,7 @@ async def open_root_actor(
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor # mark top most level process as root actor
_state._runtime_vars['_is_root'] = True _state._runtime_vars['_is_root'] = True

View File

@ -814,7 +814,7 @@ async def process_messages(
# should use it? # should use it?
# https://github.com/python-trio/trio/issues/467 # https://github.com/python-trio/trio/issues/467
log.runtime( log.runtime(
'Entering IPC msg loop:\n' 'Entering RPC msg loop:\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'|_{chan}\n' f'|_{chan}\n'
) )
@ -876,7 +876,7 @@ async def process_messages(
# XXX NOTE XXX don't start entire actor # XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is # runtime cancellation if this actor is
# currently in debug mode! # currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete pdb_complete: trio.Event|None = _debug.DebugStatus.repl_release
if pdb_complete: if pdb_complete:
await pdb_complete.wait() await pdb_complete.wait()
@ -1073,7 +1073,7 @@ async def process_messages(
log.exception(message) log.exception(message)
raise RuntimeError(message) raise RuntimeError(message)
log.runtime( log.transport(
'Waiting on next IPC msg from\n' 'Waiting on next IPC msg from\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'|_{chan}\n' f'|_{chan}\n'

View File

@ -267,10 +267,13 @@ class Actor:
self._listeners: list[trio.abc.Listener] = [] self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Channel|None = None self._parent_chan: Channel|None = None
self._forkserver_info: tuple|None = None self._forkserver_info: tuple|None = None
# track each child/sub-actor in it's locally
# supervising nursery
self._actoruid2nursery: dict[ self._actoruid2nursery: dict[
tuple[str, str], tuple[str, str], # sub-`Actor.uid`
ActorNursery|None, ActorNursery|None,
] = {} # type: ignore # noqa ] = {}
# when provided, init the registry addresses property from # when provided, init the registry addresses property from
# input via the validator. # input via the validator.
@ -659,12 +662,18 @@ class Actor:
# TODO: NEEEDS TO BE TESTED! # TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD # actually, no idea if this ever even enters.. XD
#
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if ( if (
pdb_user_uid pdb_user_uid
and local_nursery and local_nursery
): ):
entry: tuple|None = local_nursery._children.get(pdb_user_uid) entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
)
if entry: if entry:
proc: trio.Process proc: trio.Process
_, proc, _ = entry _, proc, _ = entry
@ -674,10 +683,10 @@ class Actor:
and poll() is None and poll() is None
): ):
log.cancel( log.cancel(
'Root actor reports no-more-peers, BUT ' 'Root actor reports no-more-peers, BUT\n'
'a DISCONNECTED child still has the debug ' 'a DISCONNECTED child still has the debug '
'lock!\n' 'lock!\n\n'
f'root uid: {self.uid}\n' # f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n' f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n' f'locking child uid: {pdb_user_uid}\n'
) )
@ -703,9 +712,8 @@ class Actor:
# if a now stale local task has the TTY lock still # if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for # we cancel it to allow servicing other requests for
# the lock. # the lock.
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if ( if (
db_cs (db_cs := pdb_lock.get_locking_task_cs())
and not db_cs.cancel_called and not db_cs.cancel_called
and uid == pdb_user_uid and uid == pdb_user_uid
): ):
@ -742,7 +750,7 @@ class Actor:
except KeyError: except KeyError:
log.warning( log.warning(
'Ignoring invalid IPC ctx msg!\n\n' 'Ignoring invalid IPC ctx msg!\n\n'
f'<= sender: {uid}\n' f'<= sender: {uid}\n\n'
# XXX don't need right since it's always in msg? # XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n' # f'=> cid: {cid}\n\n'
@ -796,7 +804,7 @@ class Actor:
cid, cid,
# side, # side,
)] )]
log.runtime( log.debug(
f'Retreived cached IPC ctx for\n' f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'cid:{cid}\n' f'cid:{cid}\n'
@ -835,10 +843,14 @@ class Actor:
nsf: NamespacePath, nsf: NamespacePath,
kwargs: dict, kwargs: dict,
# determines `Context.side: str`
portal: Portal|None = None,
# IPC channel config # IPC channel config
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
load_nsf: bool = False, load_nsf: bool = False,
ack_timeout: float = 3,
) -> Context: ) -> Context:
''' '''
@ -863,10 +875,12 @@ class Actor:
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
ctx._portal = portal
if ( if (
'self' in nsf 'self' in nsf
or not load_nsf or
not load_nsf
): ):
ns, _, func = nsf.partition(':') ns, _, func = nsf.partition(':')
else: else:
@ -874,42 +888,29 @@ class Actor:
# -[ ] but, how to do `self:<Actor.meth>`?? # -[ ] but, how to do `self:<Actor.meth>`??
ns, func = nsf.to_tuple() ns, func = nsf.to_tuple()
log.runtime( msg = msgtypes.Start(
'Sending cmd to\n'
f'peer: {chan.uid} => \n'
'\n'
f'=> {ns}.{func}({kwargs})\n'
)
await chan.send(
msgtypes.Start(
ns=ns, ns=ns,
func=func, func=func,
kwargs=kwargs, kwargs=kwargs,
uid=self.uid, uid=self.uid,
cid=cid, cid=cid,
) )
log.runtime(
'Sending RPC start msg\n\n'
f'=> peer: {chan.uid}\n'
f' |_ {ns}.{func}({kwargs})\n'
) )
# {'cmd': ( await chan.send(msg)
# ns,
# func,
# kwargs,
# self.uid,
# cid,
# )}
# )
# Wait on first response msg and validate; this should be
# immediate.
# first_msg: dict = await ctx._recv_chan.receive()
# functype: str = first_msg.get('functype')
# NOTE wait on first `StartAck` response msg and validate;
# this should be immediate and does not (yet) wait for the
# remote child task to sync via `Context.started()`.
with trio.fail_after(ack_timeout):
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive() first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
try: try:
functype: str = first_msg.functype functype: str = first_msg.functype
except AttributeError: except AttributeError:
raise unpack_error(first_msg, chan) raise unpack_error(first_msg, chan)
# if 'error' in first_msg:
# raise unpack_error(first_msg, chan)
if functype not in ( if functype not in (
'asyncfunc', 'asyncfunc',
@ -917,7 +918,7 @@ class Actor:
'context', 'context',
): ):
raise ValueError( raise ValueError(
f'{first_msg} is an invalid response packet?' f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
) )
ctx._remote_func_type = functype ctx._remote_func_type = functype
@ -1162,7 +1163,7 @@ class Actor:
# kill any debugger request task to avoid deadlock # kill any debugger request task to avoid deadlock
# with the root actor in this tree # with the root actor in this tree
dbcs = _debug.Lock._debugger_request_cs dbcs = _debug.DebugStatus.req_cs
if dbcs is not None: if dbcs is not None:
msg += ( msg += (
'>> Cancelling active debugger request..\n' '>> Cancelling active debugger request..\n'
@ -1237,9 +1238,9 @@ class Actor:
except KeyError: except KeyError:
# NOTE: during msging race conditions this will often # NOTE: during msging race conditions this will often
# emit, some examples: # emit, some examples:
# - callee returns a result before cancel-msg/ctxc-raised # - child returns a result before cancel-msg/ctxc-raised
# - callee self raises ctxc before caller send request, # - child self raises ctxc before parent send request,
# - callee errors prior to cancel req. # - child errors prior to cancel req.
log.cancel( log.cancel(
'Cancel request invalid, RPC task already completed?\n\n' 'Cancel request invalid, RPC task already completed?\n\n'
f'<= canceller: {requesting_uid}\n\n' f'<= canceller: {requesting_uid}\n\n'
@ -1305,12 +1306,12 @@ class Actor:
f'|_{ctx}\n' f'|_{ctx}\n'
) )
log.runtime( log.runtime(
'Waiting on RPC task to cancel\n' 'Waiting on RPC task to cancel\n\n'
f'{flow_info}' f'{flow_info}'
) )
await is_complete.wait() await is_complete.wait()
log.runtime( log.runtime(
f'Sucessfully cancelled RPC task\n' f'Sucessfully cancelled RPC task\n\n'
f'{flow_info}' f'{flow_info}'
) )
return True return True
@ -1536,8 +1537,8 @@ async def async_main(
''' '''
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
is_registered: bool = False is_registered: bool = False
try: try:

View File

@ -30,11 +30,16 @@ if TYPE_CHECKING:
_current_actor: Actor|None = None # type: ignore # noqa _current_actor: Actor|None = None # type: ignore # noqa
_last_actor_terminated: Actor|None = None _last_actor_terminated: Actor|None = None
# TODO: mk this a `msgspec.Struct`!
_runtime_vars: dict[str, Any] = { _runtime_vars: dict[str, Any] = {
'_debug_mode': False, '_debug_mode': False,
'_is_root': False, '_is_root': False,
'_root_mailbox': (None, None), '_root_mailbox': (None, None),
'_registry_addrs': [], '_registry_addrs': [],
# for `breakpoint()` support
'use_greenback': False,
} }
@ -61,7 +66,7 @@ def current_actor(
err_on_no_runtime err_on_no_runtime
and _current_actor is None and _current_actor is None
): ):
msg: str = 'No local actor has been initialized yet' msg: str = 'No local actor has been initialized yet?\n'
from ._exceptions import NoRuntime from ._exceptions import NoRuntime
if last := last_actor(): if last := last_actor():
@ -74,8 +79,8 @@ def current_actor(
# this process. # this process.
else: else:
msg += ( msg += (
'No last actor found?\n' # 'No last actor found?\n'
'Did you forget to open one of:\n\n' '\nDid you forget to call one of,\n'
'- `tractor.open_root_actor()`\n' '- `tractor.open_root_actor()`\n'
'- `tractor.open_nursery()`\n' '- `tractor.open_nursery()`\n'
) )

View File

@ -377,14 +377,17 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose() # await rx_chan.aclose()
if not self._eoc: if not self._eoc:
log.cancel( message: str = (
'Stream closed by self before it received an EoC?\n' f'Context stream closed by {self._ctx.side!r}\n'
'Setting eoc manually..\n..'
)
self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by self({self._ctx.side})\n'
f'|_{self}\n' f'|_{self}\n'
) )
log.cancel(
'Stream self-closed before receiving EoC\n\n'
+
message
)
self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <= # => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same # if we're a bi-dir ``MsgStream`` BECAUSE this same

View File

@ -131,7 +131,12 @@ class ActorNursery:
"main task" besides the runtime. "main task" besides the runtime.
''' '''
loglevel = loglevel or self._actor.loglevel or get_loglevel() __runtimeframe__: int = 1 # noqa
loglevel: str = (
loglevel
or self._actor.loglevel
or get_loglevel()
)
# configure and pass runtime state # configure and pass runtime state
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
@ -209,6 +214,7 @@ class ActorNursery:
the actor is terminated. the actor is terminated.
''' '''
__runtimeframe__: int = 1 # noqa
mod_path: str = fn.__module__ mod_path: str = fn.__module__
if name is None: if name is None:
@ -257,6 +263,7 @@ class ActorNursery:
directly without any far end graceful ``trio`` cancellation. directly without any far end graceful ``trio`` cancellation.
''' '''
__runtimeframe__: int = 1 # noqa
self.cancelled = True self.cancelled = True
# TODO: impl a repr for spawn more compact # TODO: impl a repr for spawn more compact

View File

@ -27,7 +27,6 @@ from ._debug import (
pause as pause, pause as pause,
pause_from_sync as pause_from_sync, pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler, shield_sigint_handler as shield_sigint_handler,
MultiActorPdb as MultiActorPdb,
open_crash_handler as open_crash_handler, open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler,
post_mortem as post_mortem, post_mortem as post_mortem,

View File

@ -0,0 +1,177 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Tools for code-object annotation, introspection and mutation
as it pertains to improving the grok-ability of our runtime!
'''
from __future__ import annotations
import inspect
# import msgspec
# from pprint import pformat
from types import (
FrameType,
FunctionType,
MethodType,
# CodeType,
)
from typing import (
# Any,
Callable,
# TYPE_CHECKING,
Type,
)
from tractor.msg import (
pretty_struct,
NamespacePath,
)
# TODO: yeah, i don't love this and we should prolly just
# write a decorator that actually keeps a stupid ref to the func
# obj..
def get_class_from_frame(fr: FrameType) -> (
FunctionType
|MethodType
):
'''
Attempt to get the function (or method) reference
from a given `FrameType`.
Verbatim from an SO:
https://stackoverflow.com/a/2220759
'''
args, _, _, value_dict = inspect.getargvalues(fr)
# we check the first parameter for the frame function is
# named 'self'
if (
len(args)
and
# TODO: other cases for `@classmethod` etc..?)
args[0] == 'self'
):
# in that case, 'self' will be referenced in value_dict
instance: object = value_dict.get('self')
if instance:
# return its class
return getattr(
instance,
'__class__',
None,
)
# return None otherwise
return None
def func_ref_from_frame(
frame: FrameType,
) -> Callable:
func_name: str = frame.f_code.co_name
try:
return frame.f_globals[func_name]
except KeyError:
cls: Type|None = get_class_from_frame(frame)
if cls:
return getattr(
cls,
func_name,
)
# TODO: move all this into new `.devx._code`!
# -[ ] prolly create a `@runtime_api` dec?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
class CallerInfo(pretty_struct.Struct):
rt_fi: inspect.FrameInfo
call_frame: FrameType
@property
def api_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.rt_fi.frame)
@property
def api_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
@property
def caller_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.call_frame)
@property
def caller_nsp(self) -> NamespacePath|None:
func: FunctionType = self.caller_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
def find_caller_info(
dunder_var: str = '__runtimeframe__',
iframes:int = 1,
check_frame_depth: bool = True,
) -> CallerInfo|None:
'''
Scan up the callstack for a frame with a `dunder_var: str` variable
and return the `iframes` frames above it.
By default we scan for a `__runtimeframe__` scope var which
denotes a `tractor` API above which (one frame up) is "user
app code" which "called into" the `tractor` method or func.
TODO: ex with `Portal.open_context()`
'''
# TODO: use this instead?
# https://docs.python.org/3/library/inspect.html#inspect.getouterframes
frames: list[inspect.FrameInfo] = inspect.stack()
for fi in frames:
assert (
fi.function
==
fi.frame.f_code.co_name
)
this_frame: FrameType = fi.frame
dunder_val: int|None = this_frame.f_locals.get(dunder_var)
if dunder_val:
go_up_iframes: int = (
dunder_val # could be 0 or `True` i guess?
or
iframes
)
rt_frame: FrameType = fi.frame
call_frame = rt_frame
for i in range(go_up_iframes):
call_frame = call_frame.f_back
return CallerInfo(
rt_fi=fi,
call_frame=call_frame,
)
return None

File diff suppressed because it is too large Load Diff

View File

@ -23,12 +23,31 @@ into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors. disjoint, parallel executing tasks in separate actors.
''' '''
from __future__ import annotations
import multiprocessing as mp
from signal import ( from signal import (
signal, signal,
SIGUSR1, SIGUSR1,
) )
import traceback
from typing import TYPE_CHECKING
import trio import trio
from tractor import (
_state,
log as logmod,
)
log = logmod.get_logger(__name__)
if TYPE_CHECKING:
from tractor._spawn import ProcessType
from tractor import (
Actor,
ActorNursery,
)
@trio.lowlevel.disable_ki_protection @trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None: def dump_task_tree() -> None:
@ -41,9 +60,15 @@ def dump_task_tree() -> None:
recurse_child_tasks=True recurse_child_tasks=True
) )
) )
log = get_console_log('cancel') log = get_console_log(
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor()
log.pdb( log.pdb(
f'Dumping `stackscope` tree:\n\n' f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n'
f'{tree_str}\n' f'{tree_str}\n'
) )
# import logging # import logging
@ -56,8 +81,13 @@ def dump_task_tree() -> None:
# ).exception("Error printing task tree") # ).exception("Error printing task tree")
def signal_handler(sig: int, frame: object) -> None: def signal_handler(
import traceback sig: int,
frame: object,
relay_to_subs: bool = True,
) -> None:
try: try:
trio.lowlevel.current_trio_token( trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree) ).run_sync_soon(dump_task_tree)
@ -65,6 +95,26 @@ def signal_handler(sig: int, frame: object) -> None:
# not in async context -- print a normal traceback # not in async context -- print a normal traceback
traceback.print_stack() traceback.print_stack()
if not relay_to_subs:
return
an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType
subactor: Actor
for subactor, subproc, _ in an._children.values():
log.pdb(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n'
f' |_{subproc}\n'
)
if isinstance(subproc, trio.Process):
subproc.send_signal(sig)
elif isinstance(subproc, mp.Process):
subproc._send_signal(sig)
def enable_stack_on_sig( def enable_stack_on_sig(
@ -82,3 +132,6 @@ def enable_stack_on_sig(
# NOTE: not the above can be triggered from # NOTE: not the above can be triggered from
# a (xonsh) shell using: # a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>') # kill -SIGUSR1 @$(pgrep -f '<cmd>')
#
# for example if you were looking to trace a `pytest` run
# kill -SIGUSR1 @$(pgrep -f 'pytest')

View File

@ -33,25 +33,29 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
contextmanager as cm, contextmanager as cm,
) )
# from contextvars import ( from contextvars import (
# ContextVar, ContextVar,
# Token, Token,
# ) )
import textwrap import textwrap
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Type, Type,
TYPE_CHECKING,
Union, Union,
) )
from types import ModuleType from types import ModuleType
import msgspec import msgspec
from msgspec import msgpack from msgspec import (
from trio.lowlevel import ( msgpack,
RunVar, Raw,
RunVarToken,
) )
# from trio.lowlevel import (
# RunVar,
# RunVarToken,
# )
# TODO: see notes below from @mikenerone.. # TODO: see notes below from @mikenerone..
# from tricycle import TreeVar # from tricycle import TreeVar
@ -62,6 +66,9 @@ from tractor.msg.types import (
) )
from tractor.log import get_logger from tractor.log import get_logger
if TYPE_CHECKING:
from tractor._context import Context
log = get_logger(__name__) log = get_logger(__name__)
# TODO: overall IPC msg-spec features (i.e. in this mod)! # TODO: overall IPC msg-spec features (i.e. in this mod)!
@ -157,24 +164,6 @@ class MsgCodec(Struct):
lib: ModuleType = msgspec lib: ModuleType = msgspec
# TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# TODO: use `functools.cached_property` for these ? # TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property # https://docs.python.org/3/library/functools.html#functools.cached_property
@property @property
@ -210,7 +199,25 @@ class MsgCodec(Struct):
# https://jcristharif.com/msgspec/usage.html#typed-decoding # https://jcristharif.com/msgspec/usage.html#typed-decoding
return self._dec.decode(msg) return self._dec.decode(msg)
# TODO: do we still want to try and support the sub-decoder with # TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
#
# -[ ] do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives # `.Raw` technique in the case that the `Generic` approach gives
# future grief? # future grief?
# #
@ -429,6 +436,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# #
_def_tractor_codec: MsgCodec = mk_codec( _def_tractor_codec: MsgCodec = mk_codec(
ipc_pld_spec=Any, ipc_pld_spec=Any,
# TODO: use this for debug mode locking prot?
# ipc_pld_spec=Raw,
) )
# TODO: IDEALLY provides for per-`trio.Task` specificity of the # TODO: IDEALLY provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing # IPC msging codec used by the transport layer when doing
@ -462,11 +472,9 @@ _def_tractor_codec: MsgCodec = mk_codec(
# TODO: STOP USING THIS, since it's basically a global and won't # TODO: STOP USING THIS, since it's basically a global and won't
# allow sub-IPC-ctxs to limit the msg-spec however desired.. # allow sub-IPC-ctxs to limit the msg-spec however desired..
_ctxvar_MsgCodec: MsgCodec = RunVar( # _ctxvar_MsgCodec: MsgCodec = RunVar(
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
'msgspec_codec', 'msgspec_codec',
# TODO: move this to our new `Msg`-spec!
# default=_def_msgspec_codec,
default=_def_tractor_codec, default=_def_tractor_codec,
) )
@ -475,23 +483,36 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
def apply_codec( def apply_codec(
codec: MsgCodec, codec: MsgCodec,
ctx: Context|None = None,
) -> MsgCodec: ) -> MsgCodec:
''' '''
Dynamically apply a `MsgCodec` to the current task's Dynamically apply a `MsgCodec` to the current task's runtime
runtime context such that all IPC msgs are processed context such that all (of a certain class of payload
with it for that task. containing i.e. `MsgType.pld: PayloadT`) IPC msgs are
processed with it for that task.
Uses a `contextvars.ContextVar` to ensure the scope of any
codec setting matches the current `Context` or
`._rpc.process_messages()` feeder task's prior setting without
mutating any surrounding scope.
When a `ctx` is supplied, only mod its `Context.pld_codec`.
Uses a `tricycle.TreeVar` to ensure the scope of the codec
matches the `@cm` block and DOES NOT change to the original matches the `@cm` block and DOES NOT change to the original
(default) value in new tasks (as it does for `ContextVar`). (default) value in new tasks (as it does for `ContextVar`).
See the docs:
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
orig: MsgCodec = _ctxvar_MsgCodec.get()
if ctx is not None:
var: ContextVar = ctx._var_pld_codec
else:
# use IPC channel-connection "global" codec
var: ContextVar = _ctxvar_MsgCodec
orig: MsgCodec = var.get()
assert orig is not codec assert orig is not codec
if codec.pld_spec is None: if codec.pld_spec is None:
breakpoint() breakpoint()
@ -500,22 +521,25 @@ def apply_codec(
'Applying new msg-spec codec\n\n' 'Applying new msg-spec codec\n\n'
f'{codec}\n' f'{codec}\n'
) )
token: RunVarToken = _ctxvar_MsgCodec.set(codec) token: Token = var.set(codec)
# TODO: for TreeVar approach, see docs for @cm `.being()` API: # ?TODO? for TreeVar approach which copies from the
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables # cancel-scope of the prior value, NOT the prior task
# try: # See the docs:
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
# ^- see docs for @cm `.being()` API
# with _ctxvar_MsgCodec.being(codec): # with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get() # new = _ctxvar_MsgCodec.get()
# assert new is codec # assert new is codec
# yield codec # yield codec
try: try:
yield _ctxvar_MsgCodec.get() yield var.get()
finally: finally:
_ctxvar_MsgCodec.reset(token) var.reset(token)
assert _ctxvar_MsgCodec.get() is orig assert var.get() is orig
log.info( log.info(
'Reverted to last msg-spec codec\n\n' 'Reverted to last msg-spec codec\n\n'
f'{orig}\n' f'{orig}\n'

View File

@ -76,9 +76,11 @@ class NamespacePath(str):
return self._ref return self._ref
@staticmethod @staticmethod
def _mk_fqnp(ref: type | object) -> tuple[str, str]: def _mk_fqnp(
ref: type|object,
) -> tuple[str, str]:
''' '''
Generate a minial ``str`` pair which describes a python Generate a minial `str` pair which describes a python
object's namespace path and object/type name. object's namespace path and object/type name.
In more precise terms something like: In more precise terms something like:
@ -87,10 +89,9 @@ class NamespacePath(str):
of THIS type XD of THIS type XD
''' '''
if ( if isfunction(ref):
isfunction(ref)
):
name: str = getattr(ref, '__name__') name: str = getattr(ref, '__name__')
mod_name: str = ref.__module__
elif ismethod(ref): elif ismethod(ref):
# build out the path manually i guess..? # build out the path manually i guess..?
@ -99,15 +100,19 @@ class NamespacePath(str):
type(ref.__self__).__name__, type(ref.__self__).__name__,
ref.__func__.__name__, ref.__func__.__name__,
]) ])
mod_name: str = ref.__self__.__module__
else: # object or other? else: # object or other?
# isinstance(ref, object) # isinstance(ref, object)
# and not isfunction(ref) # and not isfunction(ref)
name: str = type(ref).__name__ name: str = type(ref).__name__
mod_name: str = ref.__module__
# TODO: return static value direactly?
#
# fully qualified namespace path, tuple. # fully qualified namespace path, tuple.
fqnp: tuple[str, str] = ( fqnp: tuple[str, str] = (
ref.__module__, mod_name,
name, name,
) )
return fqnp return fqnp