Compare commits

..

1 Commits

Author SHA1 Message Date
Tyler Goodlet a870df68c0 Hack `asyncio` to not abandon a guest-mode run?
Took me a while to figure out what the heck was going on but, turns out
`asyncio` changed their SIGINT handling in 3.11 as per:

https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption

I'm not entirely sure if it's the 3.11 changes or possibly wtv further
updates were made in 3.12  but more or less due to the way
our current main task was written the `trio` guest-run was getting
abandoned on SIGINTs sent from the OS to the infected child proc..

Note that much of the bug and soln cases are layed out in very detailed
comment-notes both in the new test and `run_as_asyncio_guest()`, right
above the final "fix" lines.

Add new `test_infected_aio.test_sigint_closes_lifetime_stack()` test suite
which reliably triggers all abandonment issues with multiple cases
of different parent behaviour post-sending-SIGINT-to-child:
 1. briefly sleep then raise a KBI in the parent which was originally
    demonstrating the file leak not being cleaned up by `Actor.lifetime_stack.close()`
    and simulates a ctl-c from the console (relayed in tandem by
    the OS to the parent and child processes).
 2. do `Context.wait_for_result()` on the child context which would
    hang and timeout since the actor runtime would never complete and
    thus never relay a `ContextCancelled`.
 3. both with and without running a `asyncio` task in the `manage_file`
    child actor; originally it seemed that with an aio task scheduled in
    the child actor the guest-run abandonment always was the "loud" case
    where there seemed to be some actor teardown but with tbs from
    python failing to gracefully exit the `trio` runtime..

The (seemingly working) "fix" required 2 lines of code to be run inside
a `asyncio.CancelledError` handler around the call to `await trio_done_fut`:
- `Actor.cancel_soon()` which schedules the actor runtime to cancel on
  the next `trio` runner cycle and results in a "self cancellation" of
  the actor.
- "pumping the `asyncio` event loop" with a non-0 `.sleep(0.1)` XD
 |_ seems that a "shielded" pump with some actual `delay: float >= 0`
   did the trick to get `asyncio` to allow the `trio` runner/loop to
   fully complete its guest-run without abandonment.

Other supporting changes:
- move `._exceptions.AsyncioCancelled`, our renamed
  `asyncio.CancelledError` error-sub-type-wrapper, to `.to_asyncio` and make
  it derive from `CancelledError` so as to be sure when raised by our
  `asyncio` x-> `trio` exception relay machinery that `asyncio` is
  getting the specific type it expects during cancellation.
- do "summary status" style logging in `run_as_asyncio_guest()` wherein
  we compile the eventual `startup_msg: str` emitted just before waiting
  on the `trio_done_fut`.
- shield-wait with `out: Outcome = await asyncio.shield(trio_done_fut)`
  even though it seems to do nothing in the SIGINT handling case..(I
  presume it might help avoid abandonment in a `asyncio.Task.cancel()`
  case maybe?)
2024-06-24 16:10:23 -04:00
26 changed files with 238 additions and 754 deletions

View File

@ -1,7 +1,6 @@
"""
``tractor`` testing!!
"""
from functools import partial
import sys
import subprocess
import os
@ -9,9 +8,6 @@ import random
import signal
import platform
import time
from typing import (
AsyncContextManager,
)
import pytest
import tractor
@ -154,18 +150,6 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize("start_method", [spawn_backend], scope='module')
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't registry addr collide!
@pytest.fixture
def open_test_runtime(
reg_addr: tuple,
) -> AsyncContextManager:
return partial(
tractor.open_nursery,
registry_addrs=[reg_addr],
)
def sig_prog(proc, sig):
"Kill the actor-process with ``sig``."
proc.send_signal(sig)

View File

@ -41,7 +41,7 @@ from tractor.msg import (
from tractor.msg.types import (
_payload_msgs,
log,
PayloadMsg,
Msg,
Started,
mk_msg_spec,
)
@ -61,7 +61,7 @@ def mk_custom_codec(
uid: tuple[str, str] = tractor.current_actor().uid
# XXX NOTE XXX: despite defining `NamespacePath` as a type
# field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
# to cast to/from that type on the wire. See the docs:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
@ -321,12 +321,12 @@ def dec_type_union(
import importlib
types: list[Type] = []
for type_name in type_names:
for mod in [
for ns in [
typing,
importlib.import_module(__name__),
]:
if type_ref := getattr(
mod,
ns,
type_name,
False,
):
@ -744,7 +744,7 @@ def chk_pld_type(
# 'Error', .pld: ErrorData
codec: MsgCodec = mk_codec(
# NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified
# NOTE: this ONLY accepts `Msg.pld` fields of a specified
# type union.
ipc_pld_spec=payload_spec,
)
@ -752,7 +752,7 @@ def chk_pld_type(
# make a one-off dec to compare with our `MsgCodec` instance
# which does the below `mk_msg_spec()` call internally
ipc_msg_spec: Union[Type[Struct]]
msg_types: list[PayloadMsg[payload_spec]]
msg_types: list[Msg[payload_spec]]
(
ipc_msg_spec,
msg_types,
@ -761,7 +761,7 @@ def chk_pld_type(
)
_enc = msgpack.Encoder()
_dec = msgpack.Decoder(
type=ipc_msg_spec or Any, # like `PayloadMsg[Any]`
type=ipc_msg_spec or Any, # like `Msg[Any]`
)
assert (
@ -806,7 +806,7 @@ def chk_pld_type(
'cid': '666',
'pld': pld,
}
enc_msg: PayloadMsg = typedef(**kwargs)
enc_msg: Msg = typedef(**kwargs)
_wire_bytes: bytes = _enc.encode(enc_msg)
wire_bytes: bytes = codec.enc.encode(enc_msg)
@ -883,16 +883,25 @@ def test_limit_msgspec():
debug_mode=True
):
# ensure we can round-trip a boxing `PayloadMsg`
# ensure we can round-trip a boxing `Msg`
assert chk_pld_type(
payload_spec=Any,
pld=None,
# Msg,
Any,
None,
expect_roundtrip=True,
)
# TODO: don't need this any more right since
# `msgspec>=0.15` has the nice generics stuff yah??
#
# manually override the type annot of the payload
# field and ensure it propagates to all msg-subtypes.
# Msg.__annotations__['pld'] = Any
# verify that a mis-typed payload value won't decode
assert not chk_pld_type(
payload_spec=int,
# Msg,
int,
pld='doggy',
)
@ -904,16 +913,18 @@ def test_limit_msgspec():
value: Any
assert not chk_pld_type(
payload_spec=CustomPayload,
# Msg,
CustomPayload,
pld='doggy',
)
assert chk_pld_type(
payload_spec=CustomPayload,
# Msg,
CustomPayload,
pld=CustomPayload(name='doggy', value='urmom')
)
# yah, we can `.pause_from_sync()` now!
# uhh bc we can `.pause_from_sync()` now! :surfer:
# breakpoint()
trio.run(main)

View File

@ -1336,23 +1336,6 @@ def test_shield_pause(
child.expect(pexpect.EOF)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead
# (in terms of `greenback` and/or extra threads) and if it's from
# a sync scope suggest that usage must first call
# `ensure_portal()` in the (eventual parent) async calling scope?
def test_sync_pause_from_bg_task_in_root_actor_():
'''
When used from the root actor, normally we can only implicitly
support `.pause_from_sync()` from the main-parent-task (that
opens the runtime via `open_root_actor()`) since `greenback`
requires a `.ensure_portal()` call per `trio.Task` where it is
used.
'''
...
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
def test_correct_frames_below_hidden():
'''

View File

@ -19,7 +19,7 @@ from tractor._testing import (
@pytest.fixture
def run_example_in_subproc(
loglevel: str,
testdir: pytest.Testdir,
testdir,
reg_addr: tuple[str, int],
):

View File

@ -121,19 +121,10 @@ class Unresolved:
@dataclass
class Context:
'''
An inter-actor, SC transitive, `trio.Task` (pair)
communication context.
An inter-actor, SC transitive, `Task` communication context.
(We've also considered other names and ideas:
- "communicating tasks scope": cts
- "distributed task scope": dts
- "communicating tasks context": ctc
**Got a better idea for naming? Make an issue dawg!**
)
NB: This class should **never be instatiated directly**, it is
allocated by the runtime in 2 ways:
NB: This class should **never be instatiated directly**, it is allocated
by the runtime in 2 ways:
- by entering `Portal.open_context()` which is the primary
public API for any "parent" task or,
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
@ -219,16 +210,6 @@ class Context:
# more the the `Context` is needed?
_portal: Portal | None = None
@property
def portal(self) -> Portal|None:
'''
Return any wrapping memory-`Portal` if this is
a 'parent'-side task which called `Portal.open_context()`,
otherwise `None`.
'''
return self._portal
# NOTE: each side of the context has its own cancel scope
# which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC.
@ -318,8 +299,6 @@ class Context:
# boxed exception. NOW, it's used for spawning overrun queuing
# tasks when `.allow_overruns == True` !!!
_scope_nursery: trio.Nursery|None = None
# ^-TODO-^ change name?
# -> `._scope_tn` "scope task nursery"
# streaming overrun state tracking
_in_overrun: bool = False
@ -429,23 +408,10 @@ class Context:
'''
return self._cancel_called
@cancel_called.setter
def cancel_called(self, val: bool) -> None:
'''
Set the self-cancelled request `bool` value.
'''
# to debug who frickin sets it..
# if val:
# from .devx import pause_from_sync
# pause_from_sync()
self._cancel_called = val
@property
def canceller(self) -> tuple[str, str]|None:
'''
`Actor.uid: tuple[str, str]` of the (remote)
``Actor.uid: tuple[str, str]`` of the (remote)
actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled.
@ -549,7 +515,7 @@ class Context:
# the local scope was never cancelled
# and instead likely we received a remote side
# # cancellation that was raised inside `.wait_for_result()`
# # cancellation that was raised inside `.result()`
# or (
# (se := self._local_error)
# and se is re
@ -619,8 +585,6 @@ class Context:
self,
error: BaseException,
set_cancel_called: bool = False,
) -> None:
'''
(Maybe) cancel this local scope due to a received remote
@ -639,7 +603,7 @@ class Context:
- `Portal.open_context()`
- `Portal.result()`
- `Context.open_stream()`
- `Context.wait_for_result()`
- `Context.result()`
when called/closed by actor local task(s).
@ -765,7 +729,7 @@ class Context:
# Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error
# once exiting (or manually calling `.wait_for_result()`) the
# once exiting (or manually calling `.result()`) the
# `.open_context()` block.
cs: trio.CancelScope = self._scope
if (
@ -800,9 +764,8 @@ class Context:
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
# from .devx import pause_from_sync
# pause_from_sync()
self._scope.cancel()
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb
@ -926,7 +889,7 @@ class Context:
'''
side: str = self.side
self.cancel_called: bool = True
self._cancel_called: bool = True
header: str = (
f'Cancelling ctx with peer from {side.upper()} side\n\n'
@ -949,7 +912,7 @@ class Context:
# `._scope.cancel()` since we expect the eventual
# `ContextCancelled` from the other side to trigger this
# when the runtime finally receives it during teardown
# (normally in `.wait_for_result()` called from
# (normally in `.result()` called from
# `Portal.open_context().__aexit__()`)
if side == 'parent':
if not self._portal:
@ -1062,10 +1025,10 @@ class Context:
'''
__tracebackhide__: bool = hide_tb
peer_uid: tuple = self.chan.uid
our_uid: tuple = self.chan.uid
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
# for "graceful cancellation" case(s):
# for "graceful cancellation" case:
#
# Whenever a "side" of a context (a `Task` running in
# an actor) **is** the side which requested ctx
@ -1082,11 +1045,9 @@ class Context:
# set to the `Actor.uid` of THIS task (i.e. the
# cancellation requesting task's actor is the actor
# checking whether it should absorb the ctxc).
self_ctxc: bool = self._is_self_cancelled(remote_error)
if (
self_ctxc
and
not raise_ctxc_from_self_call
and self._is_self_cancelled(remote_error)
# TODO: ?potentially it is useful to emit certain
# warning/cancel logs for the cases where the
@ -1116,8 +1077,8 @@ class Context:
and isinstance(remote_error, RemoteActorError)
and remote_error.boxed_type is StreamOverrun
# and tuple(remote_error.msgdata['sender']) == peer_uid
and tuple(remote_error.sender) == peer_uid
# and tuple(remote_error.msgdata['sender']) == our_uid
and tuple(remote_error.sender) == our_uid
):
# NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing"
@ -1179,9 +1140,9 @@ class Context:
of the remote cancellation.
'''
__tracebackhide__: bool = False
__tracebackhide__ = hide_tb
assert self._portal, (
'`Context.wait_for_result()` can not be called from callee side!'
"Context.result() can not be called from callee side!"
)
if self._final_result_is_set():
return self._result
@ -1208,8 +1169,7 @@ class Context:
drained_msgs,
) = await msgops.drain_to_final_msg(
ctx=self,
# hide_tb=hide_tb,
hide_tb=False,
hide_tb=hide_tb,
)
drained_status: str = (
@ -1225,8 +1185,6 @@ class Context:
log.cancel(drained_status)
# __tracebackhide__: bool = hide_tb
self.maybe_raise(
# NOTE: obvi we don't care if we
# overran the far end if we're already
@ -1239,8 +1197,7 @@ class Context:
# raising something we know might happen
# during cancellation ;)
(not self._cancel_called)
),
hide_tb=hide_tb,
)
)
# TODO: eventually make `.outcome: Outcome` and thus return
# `self.outcome.unwrap()` here!
@ -1626,7 +1583,7 @@ class Context:
- NEVER `return` early before delivering the msg!
bc if the error is a ctxc and there is a task waiting on
`.wait_for_result()` we need the msg to be
`.result()` we need the msg to be
`send_chan.send_nowait()`-ed over the `._rx_chan` so
that the error is relayed to that waiter task and thus
raised in user code!
@ -1871,7 +1828,7 @@ async def open_context_from_portal(
When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be
retrieved using the `Contex.wait_for_result()` api.
retrieved using the `Contex.result()` api.
The yielded ``Context`` instance further allows for opening
bidirectional streams, explicit cancellation and
@ -2008,14 +1965,14 @@ async def open_context_from_portal(
yield ctx, first
# ??TODO??: do we still want to consider this or is
# the `else:` block handling via a `.wait_for_result()`
# the `else:` block handling via a `.result()`
# call below enough??
#
# -[ ] pretty sure `.wait_for_result()` internals do the
# -[ ] pretty sure `.result()` internals do the
# same as our ctxc handler below so it ended up
# being same (repeated?) behaviour, but ideally we
# wouldn't have that duplication either by somehow
# factoring the `.wait_for_result()` handler impl in a way
# factoring the `.result()` handler impl in a way
# that we can re-use it around the `yield` ^ here
# or vice versa?
#
@ -2153,7 +2110,7 @@ async def open_context_from_portal(
# AND a group-exc is only raised if there was > 1
# tasks started *here* in the "caller" / opener
# block. If any one of those tasks calls
# `.wait_for_result()` or `MsgStream.receive()`
# `.result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively
# called and the remote error raised causing all
# tasks to be cancelled.
@ -2223,7 +2180,7 @@ async def open_context_from_portal(
f'|_{ctx._task}\n'
)
# XXX NOTE XXX: the below call to
# `Context.wait_for_result()` will ALWAYS raise
# `Context.result()` will ALWAYS raise
# a `ContextCancelled` (via an embedded call to
# `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime
@ -2233,10 +2190,10 @@ async def open_context_from_portal(
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
try:
result_or_err: Exception|Any = await ctx.wait_for_result()
result_or_err: Exception|Any = await ctx.result()
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.wait_for_result()` we still want to
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
# determine things like `.cancelled_caught` for
# cases where there was remote cancellation but

View File

@ -56,12 +56,14 @@ async def get_registry(
]:
'''
Return a portal instance connected to a local or remote
registry-service actor; if a connection already exists re-use it
(presumably to call a `.register_actor()` registry runtime RPC
ep).
arbiter.
'''
actor: Actor = current_actor()
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
@ -70,8 +72,6 @@ async def get_registry(
Channel((host, port))
)
else:
# TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead?
async with (
_connect_chan(host, port) as chan,
open_portal(chan) as regstr_ptl,

View File

@ -20,8 +20,7 @@ Sub-process entry points.
"""
from __future__ import annotations
from functools import partial
import os
import textwrap
# import textwrap
from typing import (
Any,
TYPE_CHECKING,
@ -59,7 +58,7 @@ def _mp_main(
) -> None:
'''
The routine called *after fork* which invokes a fresh `trio.run()`
The routine called *after fork* which invokes a fresh ``trio.run``
'''
actor._forkserver_info = forkserver_info
@ -97,35 +96,6 @@ def _mp_main(
log.info(f"Subactor {actor.uid} terminated")
# TODO: move this to some kinda `.devx._conc_lang.py` eventually
# as we work out our multi-domain state-flow-syntax!
def nest_from_op(
input_op: str,
tree_str: str,
back_from_op: int = 1,
) -> str:
'''
Depth-increment the input (presumably hierarchy/supervision)
input "tree string" below the provided `input_op` execution
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
`tree_str` to nest content aligned with the ops last char.
'''
return (
f'{input_op}\n'
+
textwrap.indent(
tree_str,
prefix=(
len(input_op)
-
back_from_op
) *' ',
)
)
def _trio_main(
actor: Actor,
*,
@ -149,6 +119,7 @@ def _trio_main(
if actor.loglevel is not None:
get_console_log(actor.loglevel)
import os
actor_info: str = (
f'|_{actor}\n'
f' uid: {actor.uid}\n'
@ -157,29 +128,13 @@ def _trio_main(
f' loglevel: {actor.loglevel}\n'
)
log.info(
'Started new `trio` subactor:\n'
'Started new trio subactor:\n'
+
nest_from_op(
input_op='(>', # like a "started/play"-icon from super perspective
tree_str=actor_info,
)
# '>(\n' # like a "started/play"-icon from super perspective
# +
# actor_info,
'>\n' # like a "started/play"-icon from super perspective
+
actor_info,
)
logmeth = log.info
message: str = (
# log.info(
'Subactor terminated\n'
+
nest_from_op(
input_op=')>', # like a "started/play"-icon from super perspective
tree_str=actor_info,
)
# 'x\n' # like a "crossed-out/killed" from super perspective
# +
# actor_info
)
try:
if infect_asyncio:
actor._infected_aio = True
@ -188,18 +143,16 @@ def _trio_main(
trio.run(trio_main)
except KeyboardInterrupt:
logmeth = log.cancel
message: str = (
'Actor received KBI (aka an OS-cancel)\n'
log.cancel(
'Actor received KBI\n'
+
nest_from_op(
input_op='c)>', # like a "started/play"-icon from super perspective
tree_str=actor_info,
)
actor_info
)
except BaseException:
log.exception('Actor crashed exit?')
raise
finally:
logmeth(message)
log.info(
'Subactor terminated\n'
+
'x\n' # like a "crossed-out/killed" from super perspective
+
actor_info
)

View File

@ -40,7 +40,6 @@ from typing import (
TypeVar,
)
# import pdbp
import msgspec
from tricycle import BufferedReceiveStream
import trio
@ -291,14 +290,12 @@ class MsgpackTCPStream(MsgTransport):
else:
raise
# @pdbp.hideframe
async def send(
self,
msg: msgtypes.MsgType,
strict_types: bool = True,
hide_tb: bool = False,
# hide_tb: bool = False,
) -> None:
'''
Send a msgpack encoded py-object-blob-as-msg over TCP.
@ -307,10 +304,7 @@ class MsgpackTCPStream(MsgTransport):
invalid msg type
'''
__tracebackhide__: bool = hide_tb
# try:
# XXX see `trio._sync.AsyncContextManagerMixin` for details
# on the `.acquire()`/`.release()` sequencing..
# __tracebackhide__: bool = hide_tb
async with self._send_lock:
# NOTE: lookup the `trio.Task.context`'s var for
@ -358,14 +352,6 @@ class MsgpackTCPStream(MsgTransport):
size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data)
# TODO: does it help ever to dynamically show this
# frame?
# except BaseException as _err:
# err = _err
# if not isinstance(err, MsgTypeError):
# __tracebackhide__: bool = False
# raise
@property
def laddr(self) -> tuple[str, int]:
return self._laddr
@ -574,40 +560,27 @@ class Channel:
)
return transport
# TODO: something like,
# `pdbp.hideframe_on(errors=[MsgTypeError])`
# @pdbp.hideframe
async def send(
self,
payload: Any,
hide_tb: bool = False,
# hide_tb: bool = False,
) -> None:
'''
Send a coded msg-blob over the transport.
'''
__tracebackhide__: bool = hide_tb
try:
log.transport(
'=> send IPC msg:\n\n'
f'{pformat(payload)}\n'
)
# assert self._transport # but why typing?
await self._transport.send(
payload,
hide_tb=hide_tb,
)
except BaseException as _err:
err = _err # bind for introspection
if not isinstance(_err, MsgTypeError):
# assert err
__tracebackhide__: bool = False
else:
assert err.cid
raise
# __tracebackhide__: bool = hide_tb
log.transport(
'=> send IPC msg:\n\n'
f'{pformat(payload)}\n'
) # type: ignore
assert self._transport
await self._transport.send(
payload,
# hide_tb=hide_tb,
)
async def recv(self) -> Any:
assert self._transport

View File

@ -121,8 +121,7 @@ class Portal:
)
return self.chan
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
# TODO: factor this out into an `ActorNursery` wrapper
async def _submit_for_result(
self,
ns: str,
@ -142,22 +141,13 @@ class Portal:
portal=self,
)
# TODO: we should deprecate this API right? since if we remove
# `.run_in_actor()` (and instead move it to a `.highlevel`
# wrapper api (around a single `.open_context()` call) we don't
# really have any notion of a "main" remote task any more?
#
# @api_frame
async def wait_for_result(
self,
hide_tb: bool = True,
) -> Any:
async def result(self) -> Any:
'''
Return the final result delivered by a `Return`-msg from the
remote peer actor's "main" task's `return` statement.
Return the result(s) from the remote actor's "main" task.
'''
__tracebackhide__: bool = hide_tb
__tracebackhide__ = True
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc = self.channel._exc
@ -192,23 +182,6 @@ class Portal:
return self._final_result_pld
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def result(
self,
*args,
**kwargs,
) -> Any|Exception:
typname: str = type(self).__name__
log.warning(
f'`{typname}.result()` is DEPRECATED!\n'
'Use `{typname.wait_for_result()` instead!\n'
)
return await self.wait_for_result(
*args,
**kwargs,
)
async def _cancel_streams(self):
# terminate all locally running async generator
# IPC calls
@ -267,7 +240,6 @@ class Portal:
f'{reminfo}'
)
# XXX the one spot we set it?
self.channel._cancel_called: bool = True
try:
# send cancel cmd - might not get response
@ -307,8 +279,6 @@ class Portal:
)
return False
# TODO: do we still need this for low level `Actor`-runtime
# method calls or can we also remove it?
async def run_from_ns(
self,
namespace_path: str,
@ -346,8 +316,6 @@ class Portal:
expect_msg=Return,
)
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def run(
self,
func: str,
@ -402,8 +370,6 @@ class Portal:
expect_msg=Return,
)
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
@acm
async def open_stream_from(
self,

View File

@ -21,7 +21,6 @@ Root actor runtime ignition(s).
from contextlib import asynccontextmanager as acm
from functools import partial
import importlib
import inspect
import logging
import os
import signal
@ -116,16 +115,10 @@ async def open_root_actor(
if (
debug_mode
and maybe_enable_greenback
and (
maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False,
)
and await _debug.maybe_init_greenback(
raise_not_found=False,
)
):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin'
)
@ -271,10 +264,7 @@ async def open_root_actor(
except OSError:
# TODO: make this a "discovery" log level?
logger.info(
f'No actor registry found @ {addr}\n'
# 'Registry will be initialized in local actor..'
)
logger.warning(f'No actor registry found @ {addr}')
async with trio.open_nursery() as tn:
for addr in registry_addrs:
@ -375,25 +365,23 @@ async def open_root_actor(
)
try:
yield actor
except (
Exception,
BaseExceptionGroup,
) as err:
# XXX NOTE XXX see equiv note inside
# `._runtime.Actor._stream_handler()` where in the
# non-root or root-that-opened-this-mahually case we
# wait for the local actor-nursery to exit before
# exiting the transport channel handler.
import inspect
entered: bool = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
if (
not entered
and
not is_multi_cancelled(err)
and not is_multi_cancelled(err)
):
logger.exception('Root actor crashed\n')
logger.exception('Root actor crashed:\n')
# ALWAYS re-raise any error bubbled up from the
# runtime!

View File

@ -89,15 +89,6 @@ if TYPE_CHECKING:
log = get_logger('tractor')
# TODO: move to a `tractor.lowlevel.rpc` with the below
# func-type-cases implemented "on top of" `@context` defs.
# -[ ] std async func
# -[ ] `Portal.open_stream_from()` with async-gens?
# |_ possibly a duplex form of this with a
# `sent_from_peer = yield send_to_peer` form, which would require
# syncing the send/recv side with possibly `.receive_nowait()`
# on each `yield`?
# -[ ]
async def _invoke_non_context(
actor: Actor,
cancel_scope: CancelScope,
@ -117,7 +108,6 @@ async def _invoke_non_context(
] = trio.TASK_STATUS_IGNORED,
):
__tracebackhide__: bool = True
cs: CancelScope|None = None # ref when activated
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
@ -170,6 +160,10 @@ async def _invoke_non_context(
functype='asyncgen',
)
)
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs:
ctx._scope = cs
task_status.started(ctx)
@ -181,13 +175,15 @@ async def _invoke_non_context(
await chan.send(
Stop(cid=cid)
)
# simplest function/method request-response pattern
# XXX: in the most minimally used case, just a scheduled internal runtime
# call to `Actor._cancel_task()` from the ctx-peer task since we
# don't (yet) have a dedicated IPC msg.
# ------ - ------
else:
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
failed_resp: bool = False
try:
ack = StartAck(
@ -358,14 +354,8 @@ async def _errors_relayed_via_ipc(
# channel.
task_status.started(err)
# always reraise KBIs so they propagate at the sys-process
# level.
# XXX LOL, except when running in asyncio mode XD
# cmon guys, wtf..
if (
isinstance(err, KeyboardInterrupt)
# and not actor.is_infected_aio()
):
# always reraise KBIs so they propagate at the sys-process level.
if isinstance(err, KeyboardInterrupt):
raise
# RPC task bookeeping.
@ -468,6 +458,7 @@ async def _invoke(
# tb: TracebackType = None
cancel_scope = CancelScope()
cs: CancelScope|None = None # ref when activated
ctx = actor.get_context(
chan=chan,
cid=cid,
@ -616,8 +607,6 @@ async def _invoke(
# `@context` marked RPC function.
# - `._portal` is never set.
try:
tn: trio.Nursery
rpc_ctx_cs: CancelScope
async with (
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
@ -627,7 +616,7 @@ async def _invoke(
),
):
ctx._scope_nursery = tn
rpc_ctx_cs = ctx._scope = tn.cancel_scope
ctx._scope = tn.cancel_scope
task_status.started(ctx)
# TODO: better `trionics` tooling:
@ -653,7 +642,7 @@ async def _invoke(
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser.
if rpc_ctx_cs.cancelled_caught:
if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid
# first check for and raise any remote error
@ -663,7 +652,9 @@ async def _invoke(
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
if rpc_ctx_cs.cancel_called:
cs: CancelScope = ctx._scope
if cs.cancel_called:
canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by '
@ -689,14 +680,8 @@ async def _invoke(
elif canceller == ctx.chan.uid:
explain += f'its {ctx.peer_side!r}-side peer'
elif canceller == our_uid:
explain += 'itself'
elif canceller:
explain += 'a remote peer'
else:
explain += 'an unknown cause?'
explain += 'a remote peer'
explain += (
add_div(message=explain)
@ -1253,7 +1238,7 @@ async def process_messages(
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
# f'{pretty_struct.pformat(msg)}'
f'{pretty_struct.pformat(msg)}'
)
log.runtime(message)

View File

@ -1046,10 +1046,6 @@ class Actor:
# TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']:
from .devx import (
enable_stack_on_sig,
maybe_init_greenback,
)
try:
# TODO: maybe return some status msgs upward
# to that we can emit them in `con_status`
@ -1057,27 +1053,13 @@ class Actor:
log.devx(
'Enabling `stackscope` traces on SIGUSR1'
)
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
if rvs.get('use_greenback', False):
maybe_mod: ModuleType|None = await maybe_init_greenback()
if maybe_mod:
log.devx(
'Activated `greenback` '
'for `tractor.pause_from_sync()` support!'
)
else:
rvs['use_greenback'] = False
log.warning(
'`greenback` not installed for use in debug mode!\n'
'`tractor.pause_from_sync()` not available!'
)
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
@ -1735,8 +1717,8 @@ async def async_main(
# Register with the arbiter if we're told its addr
log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
# ^-TODO-^ we should instead show the maddr here^^
f'Registering `{actor.name}` ->\n'
f'{pformat(accept_addrs)}'
)
# TODO: ideally we don't fan out to all registrars
@ -1794,15 +1776,9 @@ async def async_main(
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as internal_err:
# ls: ExitStack = actor.lifetime_stack
# log.cancel(
# 'Closing all actor-lifetime exec scopes\n\n'
# f'|_{ls}\n'
# )
# # _debug.pause_from_sync()
# # await _debug.pause(shield=True)
# ls.close()
except Exception as err:
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not is_registered:
# TODO: I guess we could try to connect back
@ -1810,8 +1786,7 @@ async def async_main(
# once we have that all working with std streams locking?
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {actor.reg_addrs[0]}?"
)
f"@ {actor.reg_addrs[0]}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
@ -1824,44 +1799,25 @@ async def async_main(
if actor._parent_chan:
await try_ship_error_to_remote(
actor._parent_chan,
internal_err,
err,
)
# always!
match internal_err:
match err:
case ContextCancelled():
log.cancel(
f'Actor: {actor.uid} was task-context-cancelled with,\n'
f'str(internal_err)'
f'str(err)'
)
case _:
log.exception("Actor errored:")
raise
finally:
teardown_msg: str = (
log.runtime(
'Runtime nursery complete'
'-> Closing all actor lifetime contexts..'
)
ls: ExitStack = actor.lifetime_stack
cbs: list[Callable] = [
repr(tup[1].__wrapped__)
for tup in ls._exit_callbacks
]
if cbs:
cbs_str: str = '\n'.join(cbs)
teardown_msg += (
'-> Closing all actor-lifetime callbacks\n\n'
f'|_{cbs_str}\n'
)
# XXX NOTE XXX this will cause an error which
# prevents any `infected_aio` actor from continuing
# and any callbacks in the `ls` here WILL NOT be
# called!!
# await _debug.pause(shield=True)
ls.close()
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
actor.lifetime_stack.close()
@ -1900,28 +1856,23 @@ async def async_main(
failed = True
if failed:
teardown_msg += (
f'-> Failed to unregister {actor.name} from '
f'registar @ {addr}\n'
log.warning(
f'Failed to unregister {actor.name} from '
f'registar @ {addr}'
)
# log.warning(
# Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set():
if any(
chan.connected() for chan in chain(*actor._peers.values())
):
teardown_msg += (
f'-> Waiting for remaining peers {actor._peers} to clear..\n'
)
log.runtime(teardown_msg)
log.runtime(
f"Waiting for remaining peers {actor._peers} to clear")
with CancelScope(shield=True):
await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
teardown_msg += ('-> All peer channels are complete\n')
teardown_msg += ('Actor runtime completed')
log.info(teardown_msg)
log.runtime("Runtime completed")
# TODO: rename to `Registry` and move to `._discovery`!

View File

@ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = {
'_root_mailbox': (None, None),
'_registry_addrs': [],
# for `tractor.pause_from_sync()` & `breakpoint()` support
# for `breakpoint()` support
'use_greenback': False,
}

View File

@ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel):
@property
def ctx(self) -> Context:
'''
A read-only ref to this stream's inter-actor-task `Context`.
This stream's IPC `Context` ref.
'''
return self._ctx

View File

@ -80,7 +80,6 @@ class ActorNursery:
'''
def __init__(
self,
# TODO: maybe def these as fields of a struct looking type?
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
@ -89,10 +88,8 @@ class ActorNursery:
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
# TODO: rename to `._tn` for our conventional "task-nursery"
self._ria_nursery = ria_nursery
self._da_nursery = da_nursery
self._children: dict[
tuple[str, str],
tuple[
@ -101,13 +98,15 @@ class ActorNursery:
Portal | None,
]
] = {}
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False
self.errors = errors
self._scope_error: BaseException|None = None
self.exited = trio.Event()
self._scope_error: BaseException|None = None
# NOTE: when no explicit call is made to
# `.open_root_actor()` by application code,
@ -117,13 +116,6 @@ class ActorNursery:
# and syncing purposes to any actor opened nurseries.
self._implicit_runtime_started: bool = False
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
# nursery when that API get's moved outside this primitive!
self._ria_nursery = ria_nursery
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
async def start_actor(
self,
name: str,
@ -134,14 +126,10 @@ class ActorNursery:
rpc_module_paths: list[str]|None = None,
enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor
nursery: trio.Nursery|None = None,
debug_mode: bool|None = None,
infect_asyncio: bool = False,
# TODO: ideally we can rm this once we no longer have
# a `._ria_nursery` since the dependent APIs have been
# removed!
nursery: trio.Nursery|None = None,
) -> Portal:
'''
Start a (daemon) actor: an process that has no designated
@ -212,7 +200,6 @@ class ActorNursery:
# |_ dynamic @context decoration on child side
# |_ implicit `Portal.open_context() as (ctx, first):`
# and `return first` on parent side.
# |_ mention how it's similar to `trio-parallel` API?
# -[ ] use @api_frame on the wrapper
async def run_in_actor(
self,
@ -282,14 +269,11 @@ class ActorNursery:
) -> None:
'''
Cancel this actor-nursery by instructing each subactor's
runtime to cancel and wait for all underlying sub-processes
to terminate.
Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate.
If `hard_kill` is set then kill the processes directly using
the spawning-backend's API/OS-machinery without any attempt
at (graceful) `trio`-style cancellation using our
`Actor.cancel()`.
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
'''
__runtimeframe__: int = 1 # noqa
@ -645,12 +629,8 @@ async def open_nursery(
f'|_{an}\n'
)
# shutdown runtime if it was started
if implicit_runtime:
# shutdown runtime if it was started and report noisly
# that we're did so.
msg += '=> Shutting down actor runtime <=\n'
log.info(msg)
else:
# keep noise low during std operation.
log.runtime(msg)
log.info(msg)

View File

@ -29,7 +29,6 @@ from ._debug import (
shield_sigint_handler as shield_sigint_handler,
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback,
post_mortem as post_mortem,
mk_pdb as mk_pdb,
)

View File

@ -69,7 +69,6 @@ from trio import (
import tractor
from tractor.log import get_logger
from tractor._context import Context
from tractor import _state
from tractor._state import (
current_actor,
is_root_process,
@ -88,6 +87,9 @@ if TYPE_CHECKING:
from tractor._runtime import (
Actor,
)
from tractor.msg import (
_codec,
)
log = get_logger(__name__)
@ -1597,13 +1599,11 @@ async def _pause(
try:
task: Task = current_task()
except RuntimeError as rte:
__tracebackhide__: bool = False
log.exception('Failed to get current task?')
if actor.is_infected_aio():
# mk_pdb().set_trace()
raise RuntimeError(
'`tractor.pause[_from_sync]()` not yet supported '
'directly (infected) `asyncio` tasks!'
'for infected `asyncio` mode!'
) from rte
raise
@ -2163,22 +2163,22 @@ def maybe_import_greenback(
return False
async def maybe_init_greenback(**kwargs) -> None|ModuleType:
try:
if mod := maybe_import_greenback(**kwargs):
await mod.ensure_portal()
log.devx(
'`greenback` portal opened!\n'
'Sync debug support activated!\n'
)
return mod
except BaseException:
log.exception('Failed to init `greenback`..')
raise
async def maybe_init_greenback(
**kwargs,
) -> None|ModuleType:
if mod := maybe_import_greenback(**kwargs):
await mod.ensure_portal()
log.devx(
'`greenback` portal opened!\n'
'Sync debug support activated!\n'
)
return mod
return None
async def _pause_from_bg_root_thread(
behalf_of_thread: Thread,
repl: PdbREPL,
@ -2399,37 +2399,18 @@ def pause_from_sync(
else: # we are presumably the `trio.run()` + main thread
# raises on not-found by default
greenback: ModuleType = maybe_import_greenback()
# TODO: how to ensure this is either dynamically (if
# needed) called here (in some bg tn??) or that the
# subactor always already called it?
# greenback: ModuleType = await maybe_init_greenback()
message += f'-> imported {greenback}\n'
repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
try:
out = greenback.await_(
_pause(
debug_func=None,
repl=repl,
hide_tb=hide_tb,
called_from_sync=True,
**_pause_kwargs,
)
out = greenback.await_(
_pause(
debug_func=None,
repl=repl,
hide_tb=hide_tb,
called_from_sync=True,
**_pause_kwargs,
)
except RuntimeError as rte:
if not _state._runtime_vars.get(
'use_greenback',
False,
):
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
) from rte
raise
)
if out:
bg_task, repl = out
assert repl is repl
@ -2820,10 +2801,10 @@ def open_crash_handler(
`trio.run()`.
'''
err: BaseException
try:
yield
except tuple(catch) as err:
if type(err) not in ignore:
pdbp.xpm()

View File

@ -234,7 +234,7 @@ def find_caller_info(
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
# TODO: -[x] move all this into new `.devx._frame_stack`!
# TODO: -[x] move all this into new `.devx._code`!
# -[ ] consider rename to _callstack?
# -[ ] prolly create a `@runtime_api` dec?
# |_ @api_frame seems better?
@ -286,18 +286,3 @@ def api_frame(
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
wrapped.__api_func__: bool = True
return wrapper(wrapped)
# TODO: something like this instead of the adhoc frame-unhiding
# blocks all over the runtime!! XD
# -[ ] ideally we can expect a certain error (set) and if something
# else is raised then all frames below the wrapped one will be
# un-hidden via `__tracebackhide__: bool = False`.
# |_ might need to dynamically mutate the code objs like
# `pdbp.hideframe()` does?
# -[ ] use this as a `@acm` decorator as introed in 3.10?
# @acm
# async def unhide_frame_when_not(
# error_set: set[BaseException],
# ) -> TracebackType:
# ...

View File

@ -1,134 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Daemon subactor as service(s) management and supervision primitives
and API.
'''
from __future__ import annotations
from contextlib import (
# asynccontextmanager as acm,
contextmanager as cm,
)
from collections import defaultdict
from typing import (
Callable,
Any,
)
import trio
from trio import TaskStatus
from tractor import (
ActorNursery,
current_actor,
ContextCancelled,
Context,
Portal,
)
from ._util import (
log, # sub-sys logger
)
# TODO: implement a `@singleton` deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# -[ ] go through the options peeps on SO did?
# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
# * including @mikenerone's answer
# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
#
# -[ ] put it in `tractor.lowlevel._globals` ?
# * fits with our oustanding actor-local/global feat req?
# |_ https://github.com/goodboy/tractor/issues/55
# * how can it relate to the `Actor.lifetime_stack` that was
# silently patched in?
# |_ we could implicitly call both of these in the same
# spot in the runtime using the lifetime stack?
# - `open_singleton_cm().__exit__()`
# -`del_singleton()`
# |_ gives SC fixtue semantics to sync code oriented around
# sub-process lifetime?
# * what about with `trio.RunVar`?
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
# - which we'll need for no-GIL cpython (right?) presuming
# multiple `trio.run()` calls in process?
#
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# a deletion API for explicit instance de-allocation?
# @open_service_mngr.deleter
# def del_service_mngr() -> None:
# mngr = open_service_mngr._singleton[0]
# open_service_mngr._singleton[0] = None
# del mngr
# TODO: singleton factory API instead of a class API
@cm
def open_service_mngr(
*,
_singleton: list[ServiceMngr|None] = [None],
# NOTE; since default values for keyword-args are effectively
# module-vars/globals as per the note from,
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
#
# > "The default value is evaluated only once. This makes
# a difference when the default is a mutable object such as
# a list, dictionary, or instances of most classes"
#
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
else:
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
with mngr:
yield mngr

View File

@ -54,12 +54,11 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S'
# FYI, ERROR is 40
# TODO: use a `bidict` to avoid the :155 check?
CUSTOM_LEVELS: dict[str, int] = {
'TRANSPORT': 5,
'RUNTIME': 15,
'DEVX': 17,
'CANCEL': 22,
'CANCEL': 18,
'PDB': 500,
}
STD_PALETTE = {
@ -148,8 +147,6 @@ class StackLevelAdapter(LoggerAdapter):
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
NOTE: all custom level methods (above) delegate to this!
'''
if self.isEnabledFor(level):
stacklevel: int = 3

View File

@ -41,10 +41,8 @@ import textwrap
from typing import (
Any,
Callable,
Protocol,
Type,
TYPE_CHECKING,
TypeVar,
Union,
)
from types import ModuleType
@ -183,11 +181,7 @@ def mk_dec(
dec_hook: Callable|None = None,
) -> MsgDec:
'''
Create an IPC msg decoder, normally used as the
`PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`.
'''
return MsgDec(
_dec=msgpack.Decoder(
type=spec, # like `MsgType[Any]`
@ -233,13 +227,6 @@ def pformat_msgspec(
join_char: str = '\n',
) -> str:
'''
Pretty `str` format the `msgspec.msgpack.Decoder.type` attributed
for display in log messages as a nice (maybe multiline)
presentation of all the supported `Struct`s availed for typed
decoding.
'''
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
return join_char.join(
mk_msgspec_table(
@ -643,57 +630,31 @@ def limit_msg_spec(
# # import pdbp; pdbp.set_trace()
# assert ext_codec.pld_spec == extended_spec
# yield ext_codec
# TODO: make something similar to this inside `._codec` such that
# user can just pass a type table of some sort?
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
# and then call `.to_dict()` on them?
# -[x] we're going to need to re-impl all the stuff changed in the
# runtime port such that it can handle dicts or `Msg`s?
#
# ^-TODO-^ is it impossible to make something like this orr!?
# TODO: make an auto-custom hook generator from a set of input custom
# types?
# -[ ] below is a proto design using a `TypeCodec` idea?
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
# '''
# Deliver a `enc_hook()`/`dec_hook()` pair which does
# manual convertion from our above native `Msg` set
# to `dict` equivalent (wire msgs) in order to keep legacy compat
# with the original runtime implementation.
#
# type var for the expected interchange-lib's
# IPC-transport type when not available as a built-in
# serialization output.
WireT = TypeVar('WireT')
# TODO: some kinda (decorator) API for built-in subtypes
# that builds this implicitly by inspecting the `mro()`?
class TypeCodec(Protocol):
'''
A per-custom-type wire-transport serialization translator
description type.
'''
src_type: Type
wire_type: WireT
def encode(obj: Type) -> WireT:
...
def decode(
obj_type: Type[WireT],
obj: WireT,
) -> Type:
...
class MsgpackTypeCodec(TypeCodec):
...
def mk_codec_hooks(
type_codecs: list[TypeCodec],
) -> tuple[Callable, Callable]:
'''
Deliver a `enc_hook()`/`dec_hook()` pair which handle
manual convertion from an input `Type` set such that whenever
the `TypeCodec.filter()` predicate matches the
`TypeCodec.decode()` is called on the input native object by
the `dec_hook()` and whenever the
`isiinstance(obj, TypeCodec.type)` matches against an
`enc_hook(obj=obj)` the return value is taken from a
`TypeCodec.encode(obj)` callback.
'''
...
# Note: this is is/was primarly used while moving the core
# runtime over to using native `Msg`-struct types wherein we
# start with the send side emitting without loading
# a typed-decoder and then later flipping the switch over to
# load to the native struct types once all runtime usage has
# been adjusted appropriately.
#
# '''
# return (
# # enc_to_dict,
# dec_from_dict,
# )

View File

@ -580,15 +580,12 @@ async def drain_to_final_msg(
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled as taskc:
# from tractor.devx._debug import pause
# await pause(shield=True)
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's
# cancellation.
ctx.maybe_raise(
hide_tb=hide_tb,
# TODO: when use this/
# from_src_exc=taskc,
)

View File

@ -34,9 +34,6 @@ from pprint import (
saferepr,
)
from tractor.log import get_logger
log = get_logger()
# TODO: auto-gen type sig for input func both for
# type-msgs and logging of RPC tasks?
# taken and modified from:
@ -146,13 +143,7 @@ def pformat(
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
try:
val_str: str = saferepr(v)
except Exception:
log.exception(
'Failed to `saferepr({type(struct)})` !?\n'
)
return _Struct.__repr__(struct)
val_str: str = saferepr(v)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
@ -203,20 +194,12 @@ class Struct(
return sin_props
pformat = pformat
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def __repr__(self) -> str:
try:
return pformat(self)
except Exception:
log.exception(
f'Failed to `pformat({type(self)})` !?\n'
)
return _Struct.__repr__(self)
__repr__ = pformat
def copy(
self,

View File

@ -156,12 +156,11 @@ class BroadcastState(Struct):
class BroadcastReceiver(ReceiveChannel):
'''
A memory receive channel broadcaster which is non-lossy for
the fastest consumer.
A memory receive channel broadcaster which is non-lossy for the
fastest consumer.
Additional consumer tasks can receive all produced values by
registering with ``.subscribe()`` and receiving from the new
instance it delivers.
Additional consumer tasks can receive all produced values by registering
with ``.subscribe()`` and receiving from the new instance it delivers.
'''
def __init__(

View File

@ -18,12 +18,8 @@
Async context manager primitives with hard ``trio``-aware semantics
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from contextlib import asynccontextmanager as acm
import inspect
from types import ModuleType
from typing import (
Any,
AsyncContextManager,
@ -34,16 +30,13 @@ from typing import (
Optional,
Sequence,
TypeVar,
TYPE_CHECKING,
)
import trio
from tractor._state import current_actor
from tractor.log import get_logger
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__)
@ -53,10 +46,8 @@ T = TypeVar("T")
@acm
async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None,
nursery: trio.Nursery | None = None,
shield: bool = False,
lib: ModuleType = trio,
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
@ -67,12 +58,13 @@ async def maybe_open_nursery(
if nursery is not None:
yield nursery
else:
async with lib.open_nursery() as nursery:
async with trio.open_nursery() as nursery:
nursery.cancel_scope.shield = shield
yield nursery
async def _enter_and_wait(
mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
all_entered: trio.Event,
@ -99,6 +91,7 @@ async def _enter_and_wait(
@acm
async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[
@ -109,17 +102,15 @@ async def gather_contexts(
None,
]:
'''
Concurrently enter a sequence of async context managers (acms),
each from a separate `trio` task and deliver the unwrapped
`yield`-ed values in the same order once all managers have entered.
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
same order once all managers have entered. On exit all contexts are
subsequently and concurrently exited.
On exit, all acms are subsequently and concurrently exited.
This function is somewhat similar to a batch of non-blocking
calls to `contextlib.AsyncExitStack.enter_async_context()`
(inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation just works*(R).
This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited, and cancellation just works.
'''
seed: int = id(mngrs)
@ -219,10 +210,9 @@ async def maybe_open_context(
) -> AsyncIterator[tuple[bool, T]]:
'''
Maybe open an async-context-manager (acm) if there is not already
a `_Cached` version for the provided (input) `key` for *this* actor.
Return the `_Cached` instance on a _Cache hit.
Maybe open a context manager if there is not already a _Cached
version for the provided ``key`` for *this* actor. Return the
_Cached instance on a _Cache hit.
'''
fid = id(acm_func)
@ -283,13 +273,8 @@ async def maybe_open_context(
else:
_Cache.users += 1
log.runtime(
f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n'
# TODO: make this work with values but without
# `msgspec.Struct` causing frickin crashes on field-type
# lookups..
# f'{ctx_key!r} -> {yielded!r}\n'
f'Reusing resource for `_Cache` user {_Cache.users}\n\n'
f'{ctx_key!r} -> {yielded!r}\n'
)
lock.release()
yield True, yielded