Compare commits

..

19 Commits

Author SHA1 Message Date
Tyler Goodlet b761524a85 TOSQASH 22e62ed: with-stmt-ws-removal 2025-09-29 20:23:45 -04:00
Tyler Goodlet b4dbf5dd86 WIP, expand pldrx suite for tagged-multi-msgs
Nowhere near ready yet since this generates many test-body-logic
un-handled true-positives which currently fail, but it's a first draft
for the general case set. To start, includes a greater-than-one-`Msg` (a
strict top level tagged-union-of-structs) pld spec alongside
a `AnyFieldMsg`-workaround struct-msg for packing all other builtin
non-`Struct`/`Any` python types alongside the other explicit msgs.
2025-09-29 20:13:58 -04:00
Tyler Goodlet 39952344cb Ext-types test suite clean out
Removing the now masked-for-a-while unit test remnants for
`test_limit_msgspec()` (and its helper `chk_pld_type()`) since these cases
are now covered in the `test_pldrx_limiting` suite at an e2e
IPC-system-spanning level.

Note that the contents of the `chk_pld_type()` might be useful in the
future once we start setting/allowing semantics for various "phases of
IPC with matching msgspecs", but that's a little ways off rn and this
commit can always be looked up, also iirc most of the details were
already somewhat out of date and causing suite failure.
2025-09-29 11:58:15 -04:00
Tyler Goodlet 15f58495d5 Add todo-note for non-strict `msgspec` decode-mode? 2025-09-29 11:41:46 -04:00
Tyler Goodlet 2be3f93a8f Set `hide_tb` at top of `.limit_plds()` body 2025-09-25 22:05:20 -04:00
Tyler Goodlet 224e92b468 Always merge input `spec` with any `ext_types`
That is, in `.msg._codec.mk_dec()` to ensure we actually still respect
the provided `spec: Union[Type[Struct]]|Type|None` alongside any
"custom" extension-types expected to be `dec_hook()` pre-processed.

Notes,
- previously when `dec_hook()` was provided we were merging with
  a `msgspec.Raw` instead of `spec` which **is entirely wrong**; it was
  likely leftover code from the sloppy/naive first draft of extension
  types support.
- notice the `spec: Union[Type[Struct]]|Type|None` type annotation (and
  it appears as though a `test_ext_types_msgspec` suite actually passes
  the value `spec=None` fyi) with a value of `None` to imply merging as
  `Union[ext_types]|None` (or equivalently a `Optional[Union]`), due
  to the incorrect `Raw`-default usage this was actually being ignored..
  -> this case has now been clarified via comment in the fn-signature.
2025-09-25 19:39:18 -04:00
Tyler Goodlet ccedee3b87 Dynamically set `pld_spec` for `test_basic_payload_spec()
Such that we can parametrize the `@context(pld_spec)` endpoint setting
using `pytest` and of course enable testing more then just the lone
`maybe_msg_spec` case. The implementation was a bit tricky because
subactors import any `enable_modules` just after subproc spawn, so
there's no easy way to indicate from the parent should should be passed
to the `@context()` decorator since it's already resolved by the time an
IPC is established. Thus the bulk of this patch is implementing
a pre-ctx which monkey-patches the (test) `child()`-ep-defining-module
before running test logic.

Impl deats,
- drop `maybe_msg_spec` global instead providing the same value via
  a new `pld_spec: Union[Type]` parametrized input to the test suite.
- add a `decorate_child_ep()` helper which (re-)decorates the
  mod-defined `child()` IPC-context endpoint with the provided `pld_spec`.
- add a new "pre IPC context" endpoint: `set_chld_pldspec()` which can
  be opened (from another actor) just prior to opening the `child()` ep
  and it will decorate the latter (using `decorate_child_ep()`)
  presuming a `.msg._exts.enc_type_union()` generated  `pld_spec_strs`
  is provided.
- actually open the `set_chld_pldspec()` as a `deco_ctx` rom the
  root-actor and ensure we cancel it on block teardown in non-raising
  cases.
2025-09-25 19:23:09 -04:00
Tyler Goodlet 7d947d3776 Add `types`-mod to `.msg._exts.dec_type_union()`
Such that decoded output equivalent to `str|None` can actually be
unpacked from a `type_names = ['str', 'NoneType]` without just
ignoring the null-type entry.. Previously, the loop would fall through
silently ignoring the `None` -> `NoneType` string representation mapped
by `.enc_type_union()` and the output union would be incorrect.

Deats,
- include the stdlib's `types` in the lookup loop, obvi changing the
  output var's name to `_types` to not collide.
- add output checking versus input `type_names` such that we raise
  a value-error with a case specific `report: str` when either,
  * the output `_types: list[Type]` is empty,
  * the `len(_types) != len(type_names)`.
2025-09-25 18:23:44 -04:00
Tyler Goodlet 6b3cc72e5c Mv `load_module_from_path()` to a new `._code_load` submod 2025-09-25 12:19:12 -04:00
Tyler Goodlet 81c33bf550 Extend `.to_asyncio.LinkedTaskChannel` for aio side
With methods to comms similar to those that exist for the `trio` side,
- `.get()` which proxies verbatim to the `._to_aio: asyncio.Queue`,
- `.send_nowait()` which thin-wraps to `._to_trio: trio.MemorySendChannel`.

Obviously the more correct design is to break up the channel type into
a pair of handle types, one for each "side's" task in each event-loop,
that's hopefully coming shortly in a follow up patch B)

Also,
- fill in some missing doc strings, tweak some explanation comments and
  update todos.
- adjust the `test_aio_errors_and_channel_propagates_and_closes()` suite
  to use the new `chan` fn-sig-API with `.open_channel_from()` including
  the new methods for msg comms; ensures everything added here works e2e.
2025-09-21 15:53:45 -04:00
Tyler Goodlet fee1ee315c Hide `._rpc._invoke()` frame, again.. 2025-09-12 12:19:27 -04:00
Tyler Goodlet 22e62ed88e Explain the `infect_asyncio: bool` param to pass in RTE msg 2025-09-12 12:19:27 -04:00
Tyler Goodlet fdba9e42d3 Toss in masked `.set_trace()` for unshielded `.pause()` debug 2025-09-12 12:19:27 -04:00
Tyler Goodlet 3ec72e6af8 Mask tpt-closed handling of `chan.send(return_msg)`
A partial revert of commit c05d08e426 since it seem we already
suppress tpt-closed errors lower down in `.ipc.Channel.send()`; given
that i'm pretty sure this new handler code should basically never run?

Left in a todo to remove the masked content once i'm done more
thoroughly testing under `piker`.
2025-09-12 12:19:27 -04:00
Tyler Goodlet c538cb3004 More `TransportClosed`-handling around IPC-IO
For IPC-disconnects-during-teardown edge cases, augment some `._rpc`
machinery,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying `Channel` already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports-n-reraises the exc (same as prior behaviour).
  * originally i thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * hence the also-added-bu-masked-out `debug_filter` / guard expression
    around the `await debug._maybe_enter_pm()` line.
- show the `._invoke()` frame for the moment.
2025-09-12 12:19:27 -04:00
Tyler Goodlet 8842b758d7 Use new `pkg_name` in log-sys test suites 2025-09-11 17:05:35 -04:00
Tyler Goodlet 54ee624632 Implicitly name sub-logs by caller's mod
That is when no `name` is passed to `get_logger()`, try to introspect
the caller's `module.__name__` and use it to infer/get the "namespace
path" to that module the same as if using `name=__name__` as in the most
common usage.

Further, change the `_root_name` to be `pkg_name: str`, a public and
more obvious param name, and deprecate the former. This obviously adds
the necessary impl to make the new
`test_sys_log::test_implicit_mod_name_applied_for_child` test pass.

Impl detalles for `get_logger()`,
- add `pkg_name` and deprecate `_root_name`, include failover logic
  and a warning.
- implement calling module introspection using
  `inspect.stack()/getmodule()` to get both the `.__name__` and
  `.__package__` info alongside adjusted logic to set the `name`
  when not provided but only when a new `mk_sublog: bool` is set.
- tweak the `name` processing for implicitly set case,
  - rename `sub_name` -> `pkg_path: str` which is the path
    to the calling module minus that module's name component.
  - only partition `name` if `pkg_name` is `in` it.
  - use the `_root_log` for `pkg_name` duplication warnings.

Other/related,
- add types to various public mod vars missing them.
- rename `.log.log` -> `.log._root_log`.
2025-09-11 17:05:35 -04:00
Tyler Goodlet e8f2dfc088 Add an implicit-pkg-path-as-logger-name test
A bit of test driven dev to anticipate support  of `.log.get_logger()`
usage such that it can be called from arbitrary sub-modules, themselves
embedded in arbitrary sub-pkgs, of some project; the when not provided,
the `sub_name` passed to the `Logger.getChild(<sub_name>)` will be set
as the sub-pkg path "down to" the calling module.

IOW if you call something like,

`log = tractor.log.get_logger(pkg_name='mypylib')`

from some `submod.py` in a project-dir that looks like,

mypylib/
  mod.py
  subpkg/
    submod.py  <- calling module

the `log: StackLevelAdapter` child-`Logger` instance will have a
`.name: str = 'mypylib.subpkg'`, discluding the `submod` part since this
already rendered as the `{filename}` header in `log.LOG_FORMAT`.

Previously similar behaviour would be obtained by passing
`get_logger(name=__name__)` in the calling module and so much so it
motivated me to make this the default, presuming we can introspect for
the info.

Impl deats,
- duplicated a `load_module_from_path()` from `modden` to load the
  `testdir` rendered py project dir from its path.
 |_should prolly factor it down to this lib anyway bc we're going to
   need it for hot code reload? (well that and `watchfiles` Bp)
- in each of `mod.py` and `submod.py` render the `get_logger()` code
  sin `name`, expecting the (coming shortly) implicit introspection
  feat to do this.
- do `.name` and `.parent` checks against expected sub-logger values
  from `StackLevelAdapter.logger.getChildren()`.
2025-09-11 17:05:35 -04:00
Tyler Goodlet d2282f4275 Start a logging-sys unit-test module
To start ensuring that when `name=__name__` is passed we try to
de-duplicate the `_root_name` and any `leaf_mod: str` since it's already
included in the headers as `{filename}`.

Deats,
- heavily document the de-duplication `str.partition()`s in
  `.log.get_logger()` and provide the end fix by changing the predicate,
  `if rname == 'tractor':` -> `if rname == _root_name`.
  * also toss in some warnings for when we still detect duplicates.
- add todo comments around logging "filters" (vs. our "adapter").
- create the new `test_log_sys.test_root_pkg_not_duplicated()` which
  runs green with the fixes from ^.
- add a ton of test-suite todos both for existing and anticipated
  logging sys feats in the new mod.
2025-09-11 17:05:35 -04:00
12 changed files with 708 additions and 310 deletions

View File

@ -454,7 +454,7 @@ async def send_back_values(
with ( with (
maybe_apply_codec(nsp_codec) as codec, maybe_apply_codec(nsp_codec) as codec,
limit_plds( limit_plds(
rent_pld_spec, spec=rent_pld_spec,
dec_hook=dec_nsp if add_hooks else None, dec_hook=dec_nsp if add_hooks else None,
ext_types=[NamespacePath] if add_hooks else None, ext_types=[NamespacePath] if add_hooks else None,
) as pld_dec, ) as pld_dec,
@ -665,7 +665,9 @@ def test_ext_types_over_ipc(
expect_codec=nsp_codec, expect_codec=nsp_codec,
enter_value=codec, enter_value=codec,
) )
rent_pld_spec_type_strs: list[str] = _exts.enc_type_union(pld_spec) rent_pld_spec_type_strs: list[str] = _exts.enc_type_union(
pld_spec
)
# XXX should raise an mte (`MsgTypeError`) # XXX should raise an mte (`MsgTypeError`)
# when `add_hooks == False` bc the input # when `add_hooks == False` bc the input
@ -695,7 +697,7 @@ def test_ext_types_over_ipc(
limit_plds( limit_plds(
pld_spec, pld_spec,
dec_hook=dec_nsp if add_hooks else None, dec_hook=dec_nsp if add_hooks else None,
ext_types=[NamespacePath] if add_hooks else None, ext_types=[NamespacePath] if add_hooks else None,
) as pld_dec, ) as pld_dec,
): ):
ctx_pld_dec: MsgDec = ctx._pld_rx._pld_dec ctx_pld_dec: MsgDec = ctx._pld_rx._pld_dec
@ -704,7 +706,7 @@ def test_ext_types_over_ipc(
# if ( # if (
# not add_hooks # not add_hooks
# and # and
# NamespacePath in # NamespacePath in
# ): # ):
# pytest.fail('ctx should fail to open without custom enc_hook!?') # pytest.fail('ctx should fail to open without custom enc_hook!?')
@ -743,204 +745,10 @@ def test_ext_types_over_ipc(
assert exc.boxed_type is TypeError assert exc.boxed_type is TypeError
# def chk_pld_type( # TODO: further SC-msg-specific verification that the overridden
# payload_spec: Type[Struct]|Any, # subtypes DO NOT have modified type-annots from original!
# pld: Any, # 'Start', .pld: FuncSpec
# 'StartAck', .pld: IpcCtxSpec
# expect_roundtrip: bool|None = None, # 'Stop', .pld: UNSEt
# 'Error', .pld: ErrorData
# ) -> bool: # def test_per_msg_payload_spec_limits():
# pld_val_type: Type = type(pld)
# # TODO: verify that the overridden subtypes
# # DO NOT have modified type-annots from original!
# # 'Start', .pld: FuncSpec
# # 'StartAck', .pld: IpcCtxSpec
# # 'Stop', .pld: UNSEt
# # 'Error', .pld: ErrorData
# codec: MsgCodec = mk_codec(
# # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified
# # type union.
# ipc_pld_spec=payload_spec,
# )
# # make a one-off dec to compare with our `MsgCodec` instance
# # which does the below `mk_msg_spec()` call internally
# ipc_msg_spec: Union[Type[Struct]]
# msg_types: list[PayloadMsg[payload_spec]]
# (
# ipc_msg_spec,
# msg_types,
# ) = mk_msg_spec(
# payload_type_union=payload_spec,
# )
# _enc = msgpack.Encoder()
# _dec = msgpack.Decoder(
# type=ipc_msg_spec or Any, # like `PayloadMsg[Any]`
# )
# assert (
# payload_spec
# ==
# codec.pld_spec
# )
# # assert codec.dec == dec
# #
# # ^-XXX-^ not sure why these aren't "equal" but when cast
# # to `str` they seem to match ?? .. kk
# assert (
# str(ipc_msg_spec)
# ==
# str(codec.msg_spec)
# ==
# str(_dec.type)
# ==
# str(codec.dec.type)
# )
# # verify the boxed-type for all variable payload-type msgs.
# if not msg_types:
# breakpoint()
# roundtrip: bool|None = None
# pld_spec_msg_names: list[str] = [
# td.__name__ for td in _payload_msgs
# ]
# for typedef in msg_types:
# skip_runtime_msg: bool = typedef.__name__ not in pld_spec_msg_names
# if skip_runtime_msg:
# continue
# pld_field = structs.fields(typedef)[1]
# assert pld_field.type is payload_spec # TODO-^ does this need to work to get all subtypes to adhere?
# kwargs: dict[str, Any] = {
# 'cid': '666',
# 'pld': pld,
# }
# enc_msg: PayloadMsg = typedef(**kwargs)
# _wire_bytes: bytes = _enc.encode(enc_msg)
# wire_bytes: bytes = codec.enc.encode(enc_msg)
# assert _wire_bytes == wire_bytes
# ve: ValidationError|None = None
# try:
# dec_msg = codec.dec.decode(wire_bytes)
# _dec_msg = _dec.decode(wire_bytes)
# # decoded msg and thus payload should be exactly same!
# assert (roundtrip := (
# _dec_msg
# ==
# dec_msg
# ==
# enc_msg
# ))
# if (
# expect_roundtrip is not None
# and expect_roundtrip != roundtrip
# ):
# breakpoint()
# assert (
# pld
# ==
# dec_msg.pld
# ==
# enc_msg.pld
# )
# # assert (roundtrip := (_dec_msg == enc_msg))
# except ValidationError as _ve:
# ve = _ve
# roundtrip: bool = False
# if pld_val_type is payload_spec:
# raise ValueError(
# 'Got `ValidationError` despite type-var match!?\n'
# f'pld_val_type: {pld_val_type}\n'
# f'payload_type: {payload_spec}\n'
# ) from ve
# else:
# # ow we good cuz the pld spec mismatched.
# print(
# 'Got expected `ValidationError` since,\n'
# f'{pld_val_type} is not {payload_spec}\n'
# )
# else:
# if (
# payload_spec is not Any
# and
# pld_val_type is not payload_spec
# ):
# raise ValueError(
# 'DID NOT `ValidationError` despite expected type match!?\n'
# f'pld_val_type: {pld_val_type}\n'
# f'payload_type: {payload_spec}\n'
# )
# # full code decode should always be attempted!
# if roundtrip is None:
# breakpoint()
# return roundtrip
# ?TODO? maybe remove since covered in the newer `test_pldrx_limiting`
# via end-2-end testing of all this?
# -[ ] IOW do we really NEED this lowlevel unit testing?
#
# def test_limit_msgspec(
# debug_mode: bool,
# ):
# '''
# Internals unit testing to verify that type-limiting an IPC ctx's
# msg spec with `Pldrx.limit_plds()` results in various
# encapsulated `msgspec` object settings and state.
# '''
# async def main():
# async with tractor.open_root_actor(
# debug_mode=debug_mode,
# ):
# # ensure we can round-trip a boxing `PayloadMsg`
# assert chk_pld_type(
# payload_spec=Any,
# pld=None,
# expect_roundtrip=True,
# )
# # verify that a mis-typed payload value won't decode
# assert not chk_pld_type(
# payload_spec=int,
# pld='doggy',
# )
# # parametrize the boxed `.pld` type as a custom-struct
# # and ensure that parametrization propagates
# # to all payload-msg-spec-able subtypes!
# class CustomPayload(Struct):
# name: str
# value: Any
# assert not chk_pld_type(
# payload_spec=CustomPayload,
# pld='doggy',
# )
# assert chk_pld_type(
# payload_spec=CustomPayload,
# pld=CustomPayload(name='doggy', value='urmom')
# )
# # yah, we can `.pause_from_sync()` now!
# # breakpoint()
# trio.run(main)

View File

@ -732,15 +732,21 @@ def test_aio_errors_and_channel_propagates_and_closes(
async def aio_echo_server( async def aio_echo_server(
to_trio: trio.MemorySendChannel, chan: to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
) -> None: ) -> None:
'''
An IPC-msg "echo server" with msgs received and relayed by
a parent `trio.Task` into a child `asyncio.Task`
and then repeated back to that local parent (`trio.Task`)
and sent again back to the original calling remote actor.
to_trio.send_nowait('start') '''
# same semantics as `trio.TaskStatus.started()`
chan.started_nowait('start')
while True: while True:
try: try:
msg = await from_trio.get() msg = await chan.get()
except to_asyncio.TrioTaskExited: except to_asyncio.TrioTaskExited:
print( print(
'breaking aio echo loop due to `trio` exit!' 'breaking aio echo loop due to `trio` exit!'
@ -748,7 +754,7 @@ async def aio_echo_server(
break break
# echo the msg back # echo the msg back
to_trio.send_nowait(msg) chan.send_nowait(msg)
# if we get the terminate sentinel # if we get the terminate sentinel
# break the echo loop # break the echo loop
@ -765,7 +771,10 @@ async def trio_to_aio_echo_server(
): ):
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
aio_echo_server, aio_echo_server,
) as (first, chan): ) as (
first, # value from `chan.started_nowait()` above
chan,
):
assert first == 'start' assert first == 'start'
await ctx.started(first) await ctx.started(first)
@ -776,7 +785,8 @@ async def trio_to_aio_echo_server(
await chan.send(msg) await chan.send(msg)
out = await chan.receive() out = await chan.receive()
# echo back to parent actor-task
# echo back to parent-actor's remote parent-ctx-task!
await stream.send(out) await stream.send(out)
if out is None: if out is None:
@ -1090,14 +1100,12 @@ def test_sigint_closes_lifetime_stack(
# ?TODO asyncio.Task fn-deco? # ?TODO asyncio.Task fn-deco?
# -[ ] do sig checkingat import time like @context?
# -[ ] maybe name it @aio_task ??
# -[ ] chan: to_asyncio.InterloopChannel ?? # -[ ] chan: to_asyncio.InterloopChannel ??
# -[ ] do fn-sig checking at import time like @context?
# |_[ ] maybe name it @a(sync)io_task ??
# @asyncio_task <- not bad ??
async def raise_before_started( async def raise_before_started(
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel, chan: to_asyncio.LinkedTaskChannel,
) -> None: ) -> None:
''' '''
`asyncio.Task` entry point which RTEs before calling `asyncio.Task` entry point which RTEs before calling

View File

@ -0,0 +1,150 @@
'''
`tractor.log`-wrapping unit tests.
'''
from pathlib import Path
import shutil
import pytest
import tractor
from tractor import _code_load
def test_root_pkg_not_duplicated_in_logger_name():
'''
When both `pkg_name` and `name` are passed and they have
a common `<root_name>.< >` prefix, ensure that it is not
duplicated in the child's `StackLevelAdapter.name: str`.
'''
project_name: str = 'pylib'
pkg_path: str = 'pylib.subpkg.mod'
proj_log = tractor.log.get_logger(
pkg_name=project_name,
mk_sublog=False,
)
sublog = tractor.log.get_logger(
pkg_name=project_name,
name=pkg_path,
)
assert proj_log is not sublog
assert sublog.name.count(proj_log.name) == 1
assert 'mod' not in sublog.name
def test_implicit_mod_name_applied_for_child(
testdir: pytest.Pytester,
loglevel: str,
):
'''
Verify that when `.log.get_logger(pkg_name='pylib')` is called
from a given sub-mod from within the `pylib` pkg-path, we
implicitly set the equiv of `name=__name__` from the caller's
module.
'''
# tractor.log.get_console_log(level=loglevel)
proj_name: str = 'snakelib'
mod_code: str = (
f'import tractor\n'
f'\n'
f'log = tractor.log.get_logger(pkg_name="{proj_name}")\n'
)
# create a sub-module for each pkg layer
_lib = testdir.mkpydir(proj_name)
pkg: Path = Path(_lib)
subpkg: Path = pkg / 'subpkg'
subpkg.mkdir()
pkgmod: Path = subpkg / "__init__.py"
pkgmod.touch()
_submod: Path = testdir.makepyfile(
_mod=mod_code,
)
pkg_mod = pkg / 'mod.py'
pkg_subpkg_submod = subpkg / 'submod.py'
shutil.copyfile(
_submod,
pkg_mod,
)
shutil.copyfile(
_submod,
pkg_subpkg_submod,
)
testdir.chdir()
# XXX NOTE, once the "top level" pkg mod has been
# imported, we can then use `import` syntax to
# import it's sub-pkgs and modules.
pkgmod = _code_load.load_module_from_path(
Path(pkg / '__init__.py'),
module_name=proj_name,
)
pkg_root_log = tractor.log.get_logger(
pkg_name=proj_name,
mk_sublog=False,
)
assert pkg_root_log.name == proj_name
assert not pkg_root_log.logger.getChildren()
from snakelib import mod
assert mod.log.name == proj_name
from snakelib.subpkg import submod
assert (
submod.log.name
==
submod.__package__ # ?TODO, use this in `.get_logger()` instead?
==
f'{proj_name}.subpkg'
)
sub_logs = pkg_root_log.logger.getChildren()
assert len(sub_logs) == 1 # only one nested sub-pkg module
assert submod.log.logger in sub_logs
# breakpoint()
# TODO, moar tests against existing feats:
# ------ - ------
# - [ ] color settings?
# - [ ] header contents like,
# - actor + thread + task names from various conc-primitives,
# - [ ] `StackLevelAdapter` extensions,
# - our custom levels/methods: `transport|runtime|cance|pdb|devx`
# - [ ] custom-headers support?
#
# TODO, test driven dev of new-ideas/long-wanted feats,
# ------ - ------
# - [ ] https://github.com/goodboy/tractor/issues/244
# - [ ] @catern mentioned using a sync / deterministic sys
# and in particular `svlogd`?
# |_ https://smarden.org/runit/svlogd.8
# - [ ] using adapter vs. filters?
# - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838
# - [ ] `.at_least_level()` optimization which short circuits wtv
# `logging` is doing behind the scenes when the level filters
# the emission..?
# - [ ] use of `.log.get_console_log()` in subactors and the
# subtleties of ensuring it actually emits from a subproc.
# - [ ] this idea of activating per-subsys emissions with some
# kind of `.name` filter passed to the runtime or maybe configured
# via the root `StackLevelAdapter`?
# - [ ] use of `logging.dict.dictConfig()` to simplify the impl
# of any of ^^ ??
# - https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# - https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
# - https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig

View File

@ -7,7 +7,15 @@ related settings around IPC contexts.
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
import sys
import types
from typing import (
Any,
Union,
Type,
)
import msgspec
from msgspec import ( from msgspec import (
Struct, Struct,
) )
@ -22,11 +30,10 @@ from tractor import (
Portal, Portal,
) )
from tractor.msg import ( from tractor.msg import (
_codec,
_ops as msgops, _ops as msgops,
Return, Return,
) _exts,
from tractor.msg import (
_codec,
) )
from tractor.msg.types import ( from tractor.msg.types import (
log, log,
@ -41,13 +48,22 @@ class PldMsg(
# case of these details? # case of these details?
# #
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag=True, tag=True,
# tag_field='msg_type', tag_field='msg_type',
): ):
field: str field: str
maybe_msg_spec = PldMsg|None class Msg1(PldMsg):
field: str
class Msg2(PldMsg):
field: int
class AnyFieldMsg(PldMsg):
field: Any
@acm @acm
@ -104,9 +120,15 @@ async def maybe_expect_raises(
) )
@tractor.context( # NOTE, this decorator is applied dynamically by both the root and
pld_spec=maybe_msg_spec, # 'sub' actor such that we can dynamically apply various cases from
) # a parametrized test.
#
# maybe_msg_spec = PldMsg|None
#
# @tractor.context(
# pld_spec=maybe_msg_spec,
# )
async def child( async def child(
ctx: Context, ctx: Context,
started_value: int|PldMsg|None, started_value: int|PldMsg|None,
@ -114,13 +136,13 @@ async def child(
validate_pld_spec: bool, validate_pld_spec: bool,
raise_on_started_mte: bool = True, raise_on_started_mte: bool = True,
pack_any_field: bool = False,
) -> None: ) -> None:
''' '''
Call ``Context.started()`` more then once (an error). Call ``Context.started()`` more then once (an error).
''' '''
expect_started_mte: bool = started_value == 10
# sanaity check that child RPC context is the current one # sanaity check that child RPC context is the current one
curr_ctx: Context = current_ipc_ctx() curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx assert ctx is curr_ctx
@ -128,6 +150,7 @@ async def child(
rx: msgops.PldRx = ctx._pld_rx rx: msgops.PldRx = ctx._pld_rx
curr_pldec: _codec.MsgDec = rx.pld_dec curr_pldec: _codec.MsgDec = rx.pld_dec
ctx_meta: dict = getattr( ctx_meta: dict = getattr(
child, child,
'_tractor_context_meta', '_tractor_context_meta',
@ -136,10 +159,28 @@ async def child(
if ctx_meta: if ctx_meta:
assert ( assert (
ctx_meta['pld_spec'] ctx_meta['pld_spec']
is curr_pldec.spec is
is curr_pldec.pld_spec curr_pldec.spec
is
curr_pldec.pld_spec
) )
pld_types: set[Type] = _codec.unpack_spec_types(
curr_pldec.pld_spec,
)
if (
AnyFieldMsg in pld_types
and
pack_any_field
):
started_value = AnyFieldMsg(field=started_value)
expect_started_mte: bool = (
started_value == 10
and
not pack_any_field
)
# 2 cases: hdndle send-side and recv-only validation # 2 cases: hdndle send-side and recv-only validation
# - when `raise_on_started_mte == True`, send validate # - when `raise_on_started_mte == True`, send validate
# - else, parent-recv-side only validation # - else, parent-recv-side only validation
@ -219,16 +260,65 @@ async def child(
# msg-type-error from this RPC task ;) # msg-type-error from this RPC task ;)
return return_value return return_value
def decorate_child_ep(
pld_spec: Union[Type],
) -> types.ModuleType:
'''
Apply parametrized pld_spec to ctx ep like,
@tractor.context(
pld_spec=maybe_msg_spec,
)(child)
'''
this_mod = sys.modules[__name__]
global child # a mod-fn defined above
assert this_mod.child is child
this_mod.child = tractor.context(
pld_spec=pld_spec,
)(child)
return this_mod
@tractor.context
async def set_chld_pldspec(
ctx: tractor.Context,
pld_spec_strs: list[str],
):
'''
Dynamically apply the `@context(pld_spec=pld_spec)` deco to the
current actor's in-mem instance of this test module.
Allows dynamically applying the "payload-spec" in both a parent
and child actor after spawn.
'''
this_mod = sys.modules[__name__]
pld_spec: list[str] = _exts.dec_type_union(
pld_spec_strs,
mods=[
this_mod,
msgspec.inspect,
],
)
decorate_child_ep(pld_spec)
await ctx.started()
await trio.sleep_forever()
@pytest.mark.parametrize( @pytest.mark.parametrize(
'return_value', 'return_value',
[ [
'yo', 'yo',
None, None,
Msg2(field=10),
AnyFieldMsg(field='yo'),
], ],
ids=[ ids=[
'return[invalid-"yo"]', 'return[invalid-"yo"]',
'return[valid-None]', 'return[maybe-valid-None]',
'return[maybe-valid-Msg2]',
'return[maybe-valid-any-packed-yo]',
], ],
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -236,10 +326,14 @@ async def child(
[ [
10, 10,
PldMsg(field='yo'), PldMsg(field='yo'),
Msg1(field='yo'),
AnyFieldMsg(field=10),
], ],
ids=[ ids=[
'Started[invalid-10]', 'Started[invalid-10]',
'Started[valid-PldMsg]', 'Started[maybe-valid-PldMsg]',
'Started[maybe-valid-Msg1]',
'Started[maybe-valid-any-packed-10]',
], ],
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -253,12 +347,31 @@ async def child(
'no-started-pld-validate', 'no-started-pld-validate',
], ],
) )
@pytest.mark.parametrize(
'pld_spec',
[
PldMsg|None,
# demo how to have strict msgs alongside all other supported
# py-types by embedding the any-types inside a shuttle msg.
Msg1|Msg2|AnyFieldMsg,
# XXX, will never work since Struct overrides dict.
# https://jcristharif.com/msgspec/usage.html#typed-decoding
# Msg1|Msg2|msgspec.inspect.AnyType,
],
ids=[
'maybe_PldMsg_spec',
'Msg1_or_Msg2_or_AnyFieldMsg_spec',
]
)
def test_basic_payload_spec( def test_basic_payload_spec(
debug_mode: bool, debug_mode: bool,
loglevel: str, loglevel: str,
return_value: str|None, return_value: str|None,
started_value: int|PldMsg, started_value: int|PldMsg,
pld_check_started_value: bool, pld_check_started_value: bool,
pld_spec: Union[Type],
): ):
''' '''
Validate the most basic `PldRx` msg-type-spec semantics around Validate the most basic `PldRx` msg-type-spec semantics around
@ -267,16 +380,33 @@ def test_basic_payload_spec(
pld-spec. pld-spec.
''' '''
invalid_return: bool = return_value == 'yo' pld_types: set[Type] = _codec.unpack_spec_types(pld_spec)
invalid_started: bool = started_value == 10 invalid_return: bool = (
return_value == 'yo'
)
invalid_started: bool = (
started_value == 10
)
# dynamically apply ep's pld-spec in 'root'.
decorate_child_ep(pld_spec)
assert (
child._tractor_context_meta['pld_spec'] == pld_spec
)
pld_spec_strs: list[str] = _exts.enc_type_union(
pld_spec,
)
assert len(pld_types) > 1
async def main(): async def main():
nonlocal pld_spec
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
loglevel=loglevel, loglevel=loglevel,
) as an: ) as an:
p: Portal = await an.start_actor( p: Portal = await an.start_actor(
'child', 'sub',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -286,9 +416,11 @@ def test_basic_payload_spec(
if invalid_started: if invalid_started:
msg_type_str: str = 'Started' msg_type_str: str = 'Started'
bad_value: int = 10 bad_value: int = 10
elif invalid_return: elif invalid_return:
msg_type_str: str = 'Return' msg_type_str: str = 'Return'
bad_value: str = 'yo' bad_value: str = 'yo'
else: else:
# XXX but should never be used below then.. # XXX but should never be used below then..
msg_type_str: str = '' msg_type_str: str = ''
@ -302,6 +434,7 @@ def test_basic_payload_spec(
invalid_started invalid_started
) else None ) else None
) )
async with ( async with (
maybe_expect_raises( maybe_expect_raises(
raises=should_raise, raises=should_raise,
@ -315,6 +448,11 @@ def test_basic_payload_spec(
# only for debug # only for debug
# post_mortem=True, # post_mortem=True,
), ),
p.open_context(
set_chld_pldspec,
pld_spec_strs=pld_spec_strs,
) as (deco_ctx, _),
p.open_context( p.open_context(
child, child,
return_value=return_value, return_value=return_value,
@ -325,12 +463,18 @@ def test_basic_payload_spec(
# now opened with 'child' sub # now opened with 'child' sub
assert current_ipc_ctx() is ctx assert current_ipc_ctx() is ctx
assert type(first) is PldMsg # assert type(first) is PldMsg
assert isinstance(first, PldMsg)
assert first.field == 'yo' assert first.field == 'yo'
try: try:
res: None|PldMsg = await ctx.result(hide_tb=False) res: None|PldMsg = await ctx.result(hide_tb=False)
assert res is None assert res == return_value
if res is None:
await tractor.pause()
if isinstance(res, PldMsg):
assert res.field == 10
except MsgTypeError as mte: except MsgTypeError as mte:
maybe_mte = mte maybe_mte = mte
if not invalid_return: if not invalid_return:
@ -356,6 +500,9 @@ def test_basic_payload_spec(
ctx.outcome ctx.outcome
) )
if should_raise is None:
await deco_ctx.cancel()
if should_raise is None: if should_raise is None:
assert maybe_mte is None assert maybe_mte is None

View File

@ -0,0 +1,48 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
(Hot) coad (re-)load utils for python.
'''
import importlib
from pathlib import Path
import sys
from types import ModuleType
# ?TODO, move this into internal libs?
# -[ ] we already use it in `modden.config._pymod` as well
def load_module_from_path(
path: Path,
module_name: str|None = None,
) -> ModuleType:
'''
Taken from SO,
https://stackoverflow.com/a/67208147
which is based on stdlib docs,
https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly
'''
module_name = module_name or path.stem
spec = importlib.util.spec_from_file_location(
module_name,
str(path),
)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module

View File

@ -284,6 +284,10 @@ async def _errors_relayed_via_ipc(
try: try:
yield # run RPC invoke body yield # run RPC invoke body
except TransportClosed:
log.exception('Tpt disconnect during remote-exc relay?')
raise
# box and ship RPC errors for wire-transit via # box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel. # the task's requesting parent IPC-channel.
except ( except (
@ -319,6 +323,9 @@ async def _errors_relayed_via_ipc(
and debug_kbis and debug_kbis
) )
) )
# TODO? better then `debug_filter` below?
and
not isinstance(err, TransportClosed)
): ):
# XXX QUESTION XXX: is there any case where we'll # XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default? # want to debug IPC disconnects as a default?
@ -327,13 +334,25 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of # recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n' if _state.debug_mode():
f'|_{ctx}' log.exception(
) f'RPC task crashed!\n'
f'Attempting to enter debugger\n'
f'\n'
f'{ctx}'
)
entered_debug = await debug._maybe_enter_pm( entered_debug = await debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
) )
if not entered_debug: if not entered_debug:
# if we prolly should have entered the REPL but # if we prolly should have entered the REPL but
@ -675,6 +694,22 @@ async def _invoke(
f'{pretty_struct.pformat(return_msg)}\n' f'{pretty_struct.pformat(return_msg)}\n'
) )
await chan.send(return_msg) await chan.send(return_msg)
# ?TODO, remove the below since .send() already
# doesn't raise on tpt-closed?
# try:
# await chan.send(return_msg)
# except TransportClosed:
# log.exception(
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
# f'\n'
# f'{chan}\n'
# f'Channel already disconnected ??\n'
# f'\n'
# f'{pretty_struct.pformat(return_msg)}'
# )
# # ?TODO? will this ever be true though?
# if chan.connected():
# raise
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,

View File

@ -561,6 +561,9 @@ async def _pause(
return return
elif isinstance(pause_err, trio.Cancelled): elif isinstance(pause_err, trio.Cancelled):
__tracebackhide__: bool = False
# XXX, unmask to REPL it.
# mk_pdb().set_trace(frame=inspect.currentframe())
_repl_fail_report += ( _repl_fail_report += (
'You called `tractor.pause()` from an already cancelled scope!\n\n' 'You called `tractor.pause()` from an already cancelled scope!\n\n'
'Consider `await tractor.pause(shield=True)` to make it work B)\n' 'Consider `await tractor.pause(shield=True)` to make it work B)\n'

View File

@ -14,11 +14,22 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" '''
Log like a forester! An enhanced logging subsys.
""" An extended logging layer using (for now) the stdlib's `logging`
+ `colorlog` which embeds concurrency-primitive/runtime info into
records (headers) to help you better grok your distributed systems
built on `tractor`.
'''
from collections.abc import Mapping from collections.abc import Mapping
from inspect import (
FrameInfo,
getmodule,
stack,
)
import sys import sys
import logging import logging
from logging import ( from logging import (
@ -26,8 +37,10 @@ from logging import (
Logger, Logger,
StreamHandler, StreamHandler,
) )
import colorlog # type: ignore from types import ModuleType
import warnings
import colorlog # type: ignore
import trio import trio
from ._state import current_actor from ._state import current_actor
@ -39,7 +52,7 @@ _default_loglevel: str = 'ERROR'
# Super sexy formatting thanks to ``colorlog``. # Super sexy formatting thanks to ``colorlog``.
# (NOTE: we use the '{' format style) # (NOTE: we use the '{' format style)
# Here, `thin_white` is just the layperson's gray. # Here, `thin_white` is just the layperson's gray.
LOG_FORMAT = ( LOG_FORMAT: str = (
# "{bold_white}{log_color}{asctime}{reset}" # "{bold_white}{log_color}{asctime}{reset}"
"{log_color}{asctime}{reset}" "{log_color}{asctime}{reset}"
" {bold_white}{thin_white}({reset}" " {bold_white}{thin_white}({reset}"
@ -51,7 +64,7 @@ LOG_FORMAT = (
" {reset}{bold_white}{thin_white}{message}" " {reset}{bold_white}{thin_white}{message}"
) )
DATE_FORMAT = '%b %d %H:%M:%S' DATE_FORMAT: str = '%b %d %H:%M:%S'
# FYI, ERROR is 40 # FYI, ERROR is 40
# TODO: use a `bidict` to avoid the :155 check? # TODO: use a `bidict` to avoid the :155 check?
@ -75,7 +88,10 @@ STD_PALETTE = {
'TRANSPORT': 'cyan', 'TRANSPORT': 'cyan',
} }
BOLD_PALETTE = { BOLD_PALETTE: dict[
str,
dict[int, str],
] = {
'bold': { 'bold': {
level: f"bold_{color}" for level, color in STD_PALETTE.items()} level: f"bold_{color}" for level, color in STD_PALETTE.items()}
} }
@ -97,10 +113,17 @@ def at_least_level(
return False return False
# TODO: this isn't showing the correct '{filename}' # TODO, compare with using a "filter" instead?
# as it did before.. # - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838
# |_corresponding dict-config,
# https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig/7507842#7507842
# - [ ] what's the benefit/tradeoffs?
#
class StackLevelAdapter(LoggerAdapter): class StackLevelAdapter(LoggerAdapter):
'''
A (software) stack oriented logger "adapter".
'''
def at_least_level( def at_least_level(
self, self,
level: str, level: str,
@ -284,7 +307,9 @@ class ActorContextInfo(Mapping):
def get_logger( def get_logger(
name: str|None = None, name: str|None = None,
_root_name: str = _proj_name, pkg_name: str = _proj_name,
# XXX, deprecated, use ^
_root_name: str|None = None,
logger: Logger|None = None, logger: Logger|None = None,
@ -293,22 +318,89 @@ def get_logger(
# |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig # |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema # |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
subsys_spec: str|None = None, subsys_spec: str|None = None,
mk_sublog: bool = True,
) -> StackLevelAdapter: ) -> StackLevelAdapter:
''' '''
Return the `tractor`-library root logger or a sub-logger for Return the `tractor`-library root logger or a sub-logger for
`name` if provided. `name` if provided.
When `name` is left null we try to auto-detect the caller's
`mod.__name__` and use that as a the sub-logger key.
This allows for example creating a module level instance like,
.. code:: python
log = tractor.log.get_logger(_root_name='mylib')
and by default all console record headers will show the caller's
(of any `log.<level>()`-method) correct sub-pkg's
+ py-module-file.
''' '''
if _root_name:
msg: str = (
'The `_root_name: str` param of `get_logger()` is now deprecated.\n'
'Use the new `pkg_name: str` instead, it is the same usage.\n'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
pkg_name: str = _root_name or pkg_name
log: Logger log: Logger
log = rlog = logger or logging.getLogger(_root_name) log = rlog = logger or logging.getLogger(pkg_name)
# Implicitly introspect the caller's module-name whenever `name`
# if left as the null default.
#
# When the `pkg_name` is `in` in the `mod.__name__` we presume
# this instance can be created as a sub-`StackLevelAdapter` and
# that the intention is get free module-path tracing and
# filtering (well once we implement that) oriented around the
# py-module code hierarchy of the consuming project.
if (
pkg_name != _proj_name
and
name is None
and
mk_sublog
):
callstack: list[FrameInfo] = stack()
caller_fi: FrameInfo = callstack[1]
caller_mod: ModuleType = getmodule(caller_fi.frame)
if caller_mod:
# ?how is this `mod.__name__` defined?
# -> well by how the mod is imported..
# |_https://stackoverflow.com/a/15883682
mod_name: str = caller_mod.__name__
mod_pkg: str = caller_mod.__package__
log.info(
f'Generating sub-logger name,\n'
f'{mod_pkg}.{mod_name}\n'
)
# if pkg_name in caller_mod.__package__:
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
if (
pkg_name
# and
# pkg_name in mod_name
):
name = mod_name
# XXX, lowlevel debuggin..
# if pkg_name != _proj_name:
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
if ( if (
name
and
name != _proj_name name != _proj_name
and
name
): ):
# NOTE: for handling for modules that use `get_logger(__name__)` # NOTE: for handling for modules that use `get_logger(__name__)`
# we make the following stylistic choice: # we make the following stylistic choice:
# - always avoid duplicate project-package token # - always avoid duplicate project-package token
@ -318,24 +410,63 @@ def get_logger(
# since in python the {filename} is always this same # since in python the {filename} is always this same
# module-file. # module-file.
sub_name: None|str = None rname: str = pkg_name
rname, _, sub_name = name.partition('.') pkg_path: str = name
pkgpath, _, modfilename = sub_name.rpartition('.')
# NOTE: for tractor itself never include the last level # ex. modden.runtime.progman
# module key in the name such that something like: eg. # -> rname='modden', _, pkg_path='runtime.progman'
# 'tractor.trionics._broadcast` only includes the first if pkg_name in name:
# 2 tokens in the (coloured) name part. rname, _, pkg_path = name.partition('.')
if rname == 'tractor':
sub_name = pkgpath
if _root_name in sub_name: # ex. modden.runtime.progman
duplicate, _, sub_name = sub_name.partition('.') # -> pkgpath='runtime', _, leaf_mod='progman'
subpkg_path, _, leaf_mod = pkg_path.rpartition('.')
if not sub_name: # NOTE: special usage for passing `name=__name__`,
#
# - remove duplication of any root-pkg-name in the
# (sub/child-)logger name; i.e. never include the
# `pkg_name` *twice* in the top-most-pkg-name/level
#
# -> this happens normally since it is added to `.getChild()`
# and as the name of its root-logger.
#
# => So for ex. (module key in the name) something like
# `name='tractor.trionics._broadcast` is passed,
# only includes the first 2 sub-pkg name-tokens in the
# child-logger's name; the colored "pkg-namespace" header
# will then correctly show the same value as `name`.
if rname == pkg_name:
pkg_path = subpkg_path
# XXX, do some double-checks for duplication of,
# - root-pkg-name, already in root logger
# - leaf-module-name already in `{filename}` header-field
if pkg_name in pkg_path:
_duplicate, _, pkg_path = pkg_path.partition('.')
if _duplicate:
# assert _duplicate == rname
_root_log.warning(
f'Duplicate pkg-name in sub-logger key?\n'
f'pkg_name = {pkg_name!r}\n'
f'pkg_path = {pkg_path!r}\n'
)
if (
leaf_mod
and
leaf_mod in pkg_path
):
_root_log.warning(
f'Duplicate leaf-module-name in sub-logger key?\n'
f'leaf_mod = {leaf_mod!r}\n'
f'pkg_path = {pkg_path!r}\n'
)
if not pkg_path:
log = rlog log = rlog
else: elif mk_sublog:
log = rlog.getChild(sub_name) log = rlog.getChild(pkg_path)
log.level = rlog.level log.level = rlog.level
@ -350,8 +481,13 @@ def get_logger(
for name, val in CUSTOM_LEVELS.items(): for name, val in CUSTOM_LEVELS.items():
logging.addLevelName(val, name) logging.addLevelName(val, name)
# ensure customs levels exist as methods # ensure our custom adapter levels exist as methods
assert getattr(logger, name.lower()), f'Logger does not define {name}' assert getattr(
logger,
name.lower()
), (
f'Logger does not define {name}'
)
return logger return logger
@ -425,4 +561,4 @@ def get_loglevel() -> str:
# global module logger for tractor itself # global module logger for tractor itself
log: StackLevelAdapter = get_logger('tractor') _root_log: StackLevelAdapter = get_logger('tractor')

View File

@ -181,7 +181,11 @@ class MsgDec(Struct):
def mk_dec( def mk_dec(
spec: Union[Type[Struct]]|Type|None, spec: (
Union[Type[Struct]]
|Type # lone type
|None # implying `Union[*ext_types]|None`
),
# NOTE, required for ad-hoc type extensions to the underlying # NOTE, required for ad-hoc type extensions to the underlying
# serialization proto (which is default `msgpack`), # serialization proto (which is default `msgpack`),
@ -194,16 +198,18 @@ def mk_dec(
Create an IPC msg decoder, a slightly higher level wrapper around Create an IPC msg decoder, a slightly higher level wrapper around
a `msgspec.msgpack.Decoder` which provides, a `msgspec.msgpack.Decoder` which provides,
- easier introspection of the underlying type spec via - easier introspection of the underlying type spec via the
the `.spec` and `.spec_str` attrs, `.spec` and `.spec_str` attrs,
- `.hook` access to the `Decoder.dec_hook()`, - `.hook` access to the `Decoder.dec_hook()`,
- automatic custom extension-types decode support when - automatic custom extension-types decode support when
`dec_hook()` is provided such that any `PayloadMsg.pld` tagged `dec_hook()` is provided such that any `PayloadMsg.pld` tagged
as a type from from `ext_types` (presuming the `MsgCodec.encode()` also used as a type from from `ext_types` (presuming the
a `.enc_hook()`) is processed and constructed by a `PldRx` implicitily. `MsgCodec.encode()` also used a `.enc_hook()`) is processed and
constructed by a `PldRx` implicitily.
NOTE, as mentioned a `MsgDec` is normally used for `PayloadMsg.pld: PayloadT` field NOTE, as mentioned a `MsgDec` is normally used for
decoding inside an IPC-ctx-oriented `PldRx`. `PayloadMsg.pld: PayloadT` field decoding inside an
IPC-ctx-oriented `PldRx`.
''' '''
if ( if (
@ -248,12 +254,16 @@ def mk_dec(
# will work? kk B) # will work? kk B)
# #
# maybe_box_struct = mk_boxed_ext_struct(ext_types) # maybe_box_struct = mk_boxed_ext_struct(ext_types)
spec = Raw | Union[*ext_types]
spec = spec | Union[*ext_types]
return MsgDec( return MsgDec(
_dec=msgpack.Decoder( _dec=msgpack.Decoder(
type=spec, # like `MsgType[Any]` type=spec,
dec_hook=dec_hook, dec_hook=dec_hook,
# ?TODO, support it?
# https://jcristharif.com/msgspec/usage.html#strict-vs-lax-mode
# strict=False,
), ),
) )

View File

@ -33,9 +33,7 @@ converters,
|_ https://jcristharif.com/msgspec/changelog.html |_ https://jcristharif.com/msgspec/changelog.html
''' '''
from types import ( import types
ModuleType,
)
import typing import typing
from typing import ( from typing import (
Type, Type,
@ -44,35 +42,51 @@ from typing import (
def dec_type_union( def dec_type_union(
type_names: list[str], type_names: list[str],
mods: list[ModuleType] = [] mods: list[types.ModuleType] = []
) -> Type|Union[Type]: ) -> Type|Union[Type]:
''' '''
Look up types by name, compile into a list and then create and Look up types by name, compile into a list and then create and
return a `typing.Union` from the full set. return a `typing.Union` from the full set.
''' '''
# import importlib _types: list[Type] = []
types: list[Type] = []
for type_name in type_names: for type_name in type_names:
for mod in [ for mod in [
typing, typing,
# importlib.import_module(__name__), types,
] + mods: ] + mods:
if type_ref := getattr( if type_ref := getattr(
mod, mod,
type_name, type_name,
False, False,
): ):
types.append(type_ref) _types.append(type_ref)
break
# special case handling only.. report: str = ''
# ipc_pld_spec: Union[Type] = eval( if not _types:
# pld_spec_str, report: str = 'No type-instances could be resolved from `type_names` ??\n'
# {}, # globals
# {'typing': typing}, # locals
# )
return Union[*types] elif len(type_names) != len(_types):
report: str = (
f'Some type-instances could not be resolved from `type_names` ??\n'
f'_types: {_types!r}\n'
)
if report:
raise ValueError(
report
+
f'type_names: {type_names!r}\n'
)
if not _types:
raise ValueError(
f'No type-instance could be resolved from `type_names` ??\n'
f'type_names: {type_names!r}\n'
)
return Union[*_types]
def enc_type_union( def enc_type_union(

View File

@ -119,7 +119,7 @@ class PldRx(Struct):
def limit_plds( def limit_plds(
self, self,
spec: Union[Type[Struct]], spec: Union[Type[Struct]],
**dec_kwargs, **mk_dec_kwargs,
) -> MsgDec: ) -> MsgDec:
''' '''
@ -135,7 +135,7 @@ class PldRx(Struct):
orig_dec: MsgDec = self._pld_dec orig_dec: MsgDec = self._pld_dec
limit_dec: MsgDec = mk_dec( limit_dec: MsgDec = mk_dec(
spec=spec, spec=spec,
**dec_kwargs, **mk_dec_kwargs,
) )
try: try:
self._pld_dec = limit_dec self._pld_dec = limit_dec
@ -582,6 +582,7 @@ async def drain_to_final_msg(
even after ctx closure and the `.open_context()` block exit. even after ctx closure and the `.open_context()` block exit.
''' '''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns raise_overrun: bool = not ctx._allow_overruns
parent_never_opened_stream: bool = ctx._stream is None parent_never_opened_stream: bool = ctx._stream is None
@ -834,7 +835,8 @@ async def drain_to_final_msg(
f'{ctx.outcome}\n' f'{ctx.outcome}\n'
) )
__tracebackhide__: bool = hide_tb # ?TODO? why was this here and not above?
# __tracebackhide__: bool = hide_tb
return ( return (
result_msg, result_msg,
pre_result_drained, pre_result_drained,

View File

@ -94,10 +94,14 @@ else:
QueueShutDown = False QueueShutDown = False
# TODO, generally speaking we can generalize this abstraction, a "SC linked # TODO, generally speaking we can generalize this abstraction as,
# parent->child task pair", as the same "supervision scope primitive" #
# **that is** our `._context.Context` with the only difference being # > A "SC linked, inter-event-loop" channel for comms between
# in how the tasks conduct msg-passing comms. # > a `parent: trio.Task` -> `child: asyncio.Task` pair.
#
# It is **very similar** in terms of its operation as a "supervision
# scope primitive" to that of our `._context.Context` with the only
# difference being in how the tasks conduct msg-passing comms.
# #
# For `LinkedTaskChannel` we are passing the equivalent of (once you # For `LinkedTaskChannel` we are passing the equivalent of (once you
# include all the recently added `._trio/aio_to_raise` # include all the recently added `._trio/aio_to_raise`
@ -122,6 +126,7 @@ class LinkedTaskChannel(
task scheduled in the host loop. task scheduled in the host loop.
''' '''
# ?TODO, rename as `._aio_q` since it's 2-way?
_to_aio: asyncio.Queue _to_aio: asyncio.Queue
_from_aio: trio.MemoryReceiveChannel _from_aio: trio.MemoryReceiveChannel
@ -235,9 +240,11 @@ class LinkedTaskChannel(
# #
async def receive(self) -> Any: async def receive(self) -> Any:
''' '''
Receive a value from the paired `asyncio.Task` with Receive a value `trio.Task` <- `asyncio.Task`.
Note the tasks in each loop are "SC linked" as a pair with
exception/cancel handling to teardown both sides on any exception/cancel handling to teardown both sides on any
unexpected error. unexpected error or cancellation.
''' '''
try: try:
@ -261,15 +268,42 @@ class LinkedTaskChannel(
): ):
raise err raise err
async def get(self) -> Any:
'''
Receive a value `asyncio.Task` <- `trio.Task`.
This is equiv to `await self._from_trio.get()`.
'''
return await self._to_aio.get()
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
''' '''
Send a value through to the asyncio task presuming Send a value through `trio.Task` -> `asyncio.Task`
it defines a ``from_trio`` argument, if it does not presuming
it defines a `from_trio` argument or makes calls
to `chan.get()` , if it does not
this method will raise an error. this method will raise an error.
''' '''
self._to_aio.put_nowait(item) self._to_aio.put_nowait(item)
# TODO? could we only compile-in this method on an instance
# handed to the `asyncio`-side, i.e. the fn invoked with
# `.open_channel_from()`.
def send_nowait(
self,
item: Any,
) -> None:
'''
Send a value through FROM the `asyncio.Task` to
the `trio.Task` NON-BLOCKING.
This is equiv to `self._to_trio.send_nowait()`.
'''
self._to_trio.send_nowait(item)
# TODO? needed? # TODO? needed?
# async def wait_aio_complete(self) -> None: # async def wait_aio_complete(self) -> None:
# await self._aio_task_complete.wait() # await self._aio_task_complete.wait()
@ -337,9 +371,12 @@ def _run_asyncio_task(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
if not tractor.current_actor().is_infected_aio(): if not (actor := tractor.current_actor()).is_infected_aio():
raise RuntimeError( raise RuntimeError(
"`infect_asyncio` mode is not enabled!?" f'`infect_asyncio: bool` mode is not enabled ??\n'
f'Ensure you pass `ActorNursery.start_actor(infect_asyncio=True)`\n'
f'\n'
f'{actor}\n'
) )
# ITC (inter task comms), these channel/queue names are mostly from # ITC (inter task comms), these channel/queue names are mostly from
@ -1666,7 +1703,7 @@ def run_as_asyncio_guest(
# asyncio.CancelledError, # asyncio.CancelledError,
# ^^XXX `.shield()` call above prevents this?? # ^^XXX `.shield()` call above prevents this??
)as state_err: ) as state_err:
# XXX be super dupere noisy about abandonment issues! # XXX be super dupere noisy about abandonment issues!
aio_task: asyncio.Task = asyncio.current_task() aio_task: asyncio.Task = asyncio.current_task()