Compare commits
1 Commits
5ed30dec40
...
284fa0340e
Author | SHA1 | Date |
---|---|---|
|
284fa0340e |
|
@ -1,6 +1,7 @@
|
||||||
"""
|
"""
|
||||||
``tractor`` testing!!
|
``tractor`` testing!!
|
||||||
"""
|
"""
|
||||||
|
from functools import partial
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
import os
|
import os
|
||||||
|
@ -8,6 +9,9 @@ import random
|
||||||
import signal
|
import signal
|
||||||
import platform
|
import platform
|
||||||
import time
|
import time
|
||||||
|
from typing import (
|
||||||
|
AsyncContextManager,
|
||||||
|
)
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import tractor
|
import tractor
|
||||||
|
@ -150,6 +154,18 @@ def pytest_generate_tests(metafunc):
|
||||||
metafunc.parametrize("start_method", [spawn_backend], scope='module')
|
metafunc.parametrize("start_method", [spawn_backend], scope='module')
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: a way to let test scripts (like from `examples/`)
|
||||||
|
# guarantee they won't registry addr collide!
|
||||||
|
@pytest.fixture
|
||||||
|
def open_test_runtime(
|
||||||
|
reg_addr: tuple,
|
||||||
|
) -> AsyncContextManager:
|
||||||
|
return partial(
|
||||||
|
tractor.open_nursery,
|
||||||
|
registry_addrs=[reg_addr],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def sig_prog(proc, sig):
|
def sig_prog(proc, sig):
|
||||||
"Kill the actor-process with ``sig``."
|
"Kill the actor-process with ``sig``."
|
||||||
proc.send_signal(sig)
|
proc.send_signal(sig)
|
||||||
|
|
|
@ -41,7 +41,7 @@ from tractor.msg import (
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
_payload_msgs,
|
_payload_msgs,
|
||||||
log,
|
log,
|
||||||
Msg,
|
PayloadMsg,
|
||||||
Started,
|
Started,
|
||||||
mk_msg_spec,
|
mk_msg_spec,
|
||||||
)
|
)
|
||||||
|
@ -61,7 +61,7 @@ def mk_custom_codec(
|
||||||
uid: tuple[str, str] = tractor.current_actor().uid
|
uid: tuple[str, str] = tractor.current_actor().uid
|
||||||
|
|
||||||
# XXX NOTE XXX: despite defining `NamespacePath` as a type
|
# XXX NOTE XXX: despite defining `NamespacePath` as a type
|
||||||
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
|
# field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair
|
||||||
# to cast to/from that type on the wire. See the docs:
|
# to cast to/from that type on the wire. See the docs:
|
||||||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||||
|
|
||||||
|
@ -321,12 +321,12 @@ def dec_type_union(
|
||||||
import importlib
|
import importlib
|
||||||
types: list[Type] = []
|
types: list[Type] = []
|
||||||
for type_name in type_names:
|
for type_name in type_names:
|
||||||
for ns in [
|
for mod in [
|
||||||
typing,
|
typing,
|
||||||
importlib.import_module(__name__),
|
importlib.import_module(__name__),
|
||||||
]:
|
]:
|
||||||
if type_ref := getattr(
|
if type_ref := getattr(
|
||||||
ns,
|
mod,
|
||||||
type_name,
|
type_name,
|
||||||
False,
|
False,
|
||||||
):
|
):
|
||||||
|
@ -744,7 +744,7 @@ def chk_pld_type(
|
||||||
# 'Error', .pld: ErrorData
|
# 'Error', .pld: ErrorData
|
||||||
|
|
||||||
codec: MsgCodec = mk_codec(
|
codec: MsgCodec = mk_codec(
|
||||||
# NOTE: this ONLY accepts `Msg.pld` fields of a specified
|
# NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified
|
||||||
# type union.
|
# type union.
|
||||||
ipc_pld_spec=payload_spec,
|
ipc_pld_spec=payload_spec,
|
||||||
)
|
)
|
||||||
|
@ -752,7 +752,7 @@ def chk_pld_type(
|
||||||
# make a one-off dec to compare with our `MsgCodec` instance
|
# make a one-off dec to compare with our `MsgCodec` instance
|
||||||
# which does the below `mk_msg_spec()` call internally
|
# which does the below `mk_msg_spec()` call internally
|
||||||
ipc_msg_spec: Union[Type[Struct]]
|
ipc_msg_spec: Union[Type[Struct]]
|
||||||
msg_types: list[Msg[payload_spec]]
|
msg_types: list[PayloadMsg[payload_spec]]
|
||||||
(
|
(
|
||||||
ipc_msg_spec,
|
ipc_msg_spec,
|
||||||
msg_types,
|
msg_types,
|
||||||
|
@ -761,7 +761,7 @@ def chk_pld_type(
|
||||||
)
|
)
|
||||||
_enc = msgpack.Encoder()
|
_enc = msgpack.Encoder()
|
||||||
_dec = msgpack.Decoder(
|
_dec = msgpack.Decoder(
|
||||||
type=ipc_msg_spec or Any, # like `Msg[Any]`
|
type=ipc_msg_spec or Any, # like `PayloadMsg[Any]`
|
||||||
)
|
)
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
|
@ -806,7 +806,7 @@ def chk_pld_type(
|
||||||
'cid': '666',
|
'cid': '666',
|
||||||
'pld': pld,
|
'pld': pld,
|
||||||
}
|
}
|
||||||
enc_msg: Msg = typedef(**kwargs)
|
enc_msg: PayloadMsg = typedef(**kwargs)
|
||||||
|
|
||||||
_wire_bytes: bytes = _enc.encode(enc_msg)
|
_wire_bytes: bytes = _enc.encode(enc_msg)
|
||||||
wire_bytes: bytes = codec.enc.encode(enc_msg)
|
wire_bytes: bytes = codec.enc.encode(enc_msg)
|
||||||
|
@ -883,25 +883,16 @@ def test_limit_msgspec():
|
||||||
debug_mode=True
|
debug_mode=True
|
||||||
):
|
):
|
||||||
|
|
||||||
# ensure we can round-trip a boxing `Msg`
|
# ensure we can round-trip a boxing `PayloadMsg`
|
||||||
assert chk_pld_type(
|
assert chk_pld_type(
|
||||||
# Msg,
|
payload_spec=Any,
|
||||||
Any,
|
pld=None,
|
||||||
None,
|
|
||||||
expect_roundtrip=True,
|
expect_roundtrip=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: don't need this any more right since
|
|
||||||
# `msgspec>=0.15` has the nice generics stuff yah??
|
|
||||||
#
|
|
||||||
# manually override the type annot of the payload
|
|
||||||
# field and ensure it propagates to all msg-subtypes.
|
|
||||||
# Msg.__annotations__['pld'] = Any
|
|
||||||
|
|
||||||
# verify that a mis-typed payload value won't decode
|
# verify that a mis-typed payload value won't decode
|
||||||
assert not chk_pld_type(
|
assert not chk_pld_type(
|
||||||
# Msg,
|
payload_spec=int,
|
||||||
int,
|
|
||||||
pld='doggy',
|
pld='doggy',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -913,18 +904,16 @@ def test_limit_msgspec():
|
||||||
value: Any
|
value: Any
|
||||||
|
|
||||||
assert not chk_pld_type(
|
assert not chk_pld_type(
|
||||||
# Msg,
|
payload_spec=CustomPayload,
|
||||||
CustomPayload,
|
|
||||||
pld='doggy',
|
pld='doggy',
|
||||||
)
|
)
|
||||||
|
|
||||||
assert chk_pld_type(
|
assert chk_pld_type(
|
||||||
# Msg,
|
payload_spec=CustomPayload,
|
||||||
CustomPayload,
|
|
||||||
pld=CustomPayload(name='doggy', value='urmom')
|
pld=CustomPayload(name='doggy', value='urmom')
|
||||||
)
|
)
|
||||||
|
|
||||||
# uhh bc we can `.pause_from_sync()` now! :surfer:
|
# yah, we can `.pause_from_sync()` now!
|
||||||
# breakpoint()
|
# breakpoint()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -1336,6 +1336,23 @@ def test_shield_pause(
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: better error for "non-ideal" usage from the root actor.
|
||||||
|
# -[ ] if called from an async scope emit a message that suggests
|
||||||
|
# using `await tractor.pause()` instead since it's less overhead
|
||||||
|
# (in terms of `greenback` and/or extra threads) and if it's from
|
||||||
|
# a sync scope suggest that usage must first call
|
||||||
|
# `ensure_portal()` in the (eventual parent) async calling scope?
|
||||||
|
def test_sync_pause_from_bg_task_in_root_actor_():
|
||||||
|
'''
|
||||||
|
When used from the root actor, normally we can only implicitly
|
||||||
|
support `.pause_from_sync()` from the main-parent-task (that
|
||||||
|
opens the runtime via `open_root_actor()`) since `greenback`
|
||||||
|
requires a `.ensure_portal()` call per `trio.Task` where it is
|
||||||
|
used.
|
||||||
|
|
||||||
|
'''
|
||||||
|
...
|
||||||
|
|
||||||
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
||||||
def test_correct_frames_below_hidden():
|
def test_correct_frames_below_hidden():
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -19,7 +19,7 @@ from tractor._testing import (
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def run_example_in_subproc(
|
def run_example_in_subproc(
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
testdir,
|
testdir: pytest.Testdir,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
):
|
):
|
||||||
|
|
||||||
|
|
|
@ -121,10 +121,19 @@ class Unresolved:
|
||||||
@dataclass
|
@dataclass
|
||||||
class Context:
|
class Context:
|
||||||
'''
|
'''
|
||||||
An inter-actor, SC transitive, `Task` communication context.
|
An inter-actor, SC transitive, `trio.Task` (pair)
|
||||||
|
communication context.
|
||||||
|
|
||||||
NB: This class should **never be instatiated directly**, it is allocated
|
(We've also considered other names and ideas:
|
||||||
by the runtime in 2 ways:
|
- "communicating tasks scope": cts
|
||||||
|
- "distributed task scope": dts
|
||||||
|
- "communicating tasks context": ctc
|
||||||
|
|
||||||
|
**Got a better idea for naming? Make an issue dawg!**
|
||||||
|
)
|
||||||
|
|
||||||
|
NB: This class should **never be instatiated directly**, it is
|
||||||
|
allocated by the runtime in 2 ways:
|
||||||
- by entering `Portal.open_context()` which is the primary
|
- by entering `Portal.open_context()` which is the primary
|
||||||
public API for any "parent" task or,
|
public API for any "parent" task or,
|
||||||
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
|
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
|
||||||
|
@ -210,6 +219,16 @@ class Context:
|
||||||
# more the the `Context` is needed?
|
# more the the `Context` is needed?
|
||||||
_portal: Portal | None = None
|
_portal: Portal | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def portal(self) -> Portal|None:
|
||||||
|
'''
|
||||||
|
Return any wrapping memory-`Portal` if this is
|
||||||
|
a 'parent'-side task which called `Portal.open_context()`,
|
||||||
|
otherwise `None`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._portal
|
||||||
|
|
||||||
# NOTE: each side of the context has its own cancel scope
|
# NOTE: each side of the context has its own cancel scope
|
||||||
# which is exactly the primitive that allows for
|
# which is exactly the primitive that allows for
|
||||||
# cross-actor-task-supervision and thus SC.
|
# cross-actor-task-supervision and thus SC.
|
||||||
|
@ -299,6 +318,8 @@ class Context:
|
||||||
# boxed exception. NOW, it's used for spawning overrun queuing
|
# boxed exception. NOW, it's used for spawning overrun queuing
|
||||||
# tasks when `.allow_overruns == True` !!!
|
# tasks when `.allow_overruns == True` !!!
|
||||||
_scope_nursery: trio.Nursery|None = None
|
_scope_nursery: trio.Nursery|None = None
|
||||||
|
# ^-TODO-^ change name?
|
||||||
|
# -> `._scope_tn` "scope task nursery"
|
||||||
|
|
||||||
# streaming overrun state tracking
|
# streaming overrun state tracking
|
||||||
_in_overrun: bool = False
|
_in_overrun: bool = False
|
||||||
|
@ -408,10 +429,23 @@ class Context:
|
||||||
'''
|
'''
|
||||||
return self._cancel_called
|
return self._cancel_called
|
||||||
|
|
||||||
|
@cancel_called.setter
|
||||||
|
def cancel_called(self, val: bool) -> None:
|
||||||
|
'''
|
||||||
|
Set the self-cancelled request `bool` value.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# to debug who frickin sets it..
|
||||||
|
# if val:
|
||||||
|
# from .devx import pause_from_sync
|
||||||
|
# pause_from_sync()
|
||||||
|
|
||||||
|
self._cancel_called = val
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def canceller(self) -> tuple[str, str]|None:
|
def canceller(self) -> tuple[str, str]|None:
|
||||||
'''
|
'''
|
||||||
``Actor.uid: tuple[str, str]`` of the (remote)
|
`Actor.uid: tuple[str, str]` of the (remote)
|
||||||
actor-process who's task was cancelled thus causing this
|
actor-process who's task was cancelled thus causing this
|
||||||
(side of the) context to also be cancelled.
|
(side of the) context to also be cancelled.
|
||||||
|
|
||||||
|
@ -515,7 +549,7 @@ class Context:
|
||||||
|
|
||||||
# the local scope was never cancelled
|
# the local scope was never cancelled
|
||||||
# and instead likely we received a remote side
|
# and instead likely we received a remote side
|
||||||
# # cancellation that was raised inside `.result()`
|
# # cancellation that was raised inside `.wait_for_result()`
|
||||||
# or (
|
# or (
|
||||||
# (se := self._local_error)
|
# (se := self._local_error)
|
||||||
# and se is re
|
# and se is re
|
||||||
|
@ -585,6 +619,8 @@ class Context:
|
||||||
self,
|
self,
|
||||||
error: BaseException,
|
error: BaseException,
|
||||||
|
|
||||||
|
set_cancel_called: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
(Maybe) cancel this local scope due to a received remote
|
(Maybe) cancel this local scope due to a received remote
|
||||||
|
@ -603,7 +639,7 @@ class Context:
|
||||||
- `Portal.open_context()`
|
- `Portal.open_context()`
|
||||||
- `Portal.result()`
|
- `Portal.result()`
|
||||||
- `Context.open_stream()`
|
- `Context.open_stream()`
|
||||||
- `Context.result()`
|
- `Context.wait_for_result()`
|
||||||
|
|
||||||
when called/closed by actor local task(s).
|
when called/closed by actor local task(s).
|
||||||
|
|
||||||
|
@ -729,7 +765,7 @@ class Context:
|
||||||
|
|
||||||
# Cancel the local `._scope`, catch that
|
# Cancel the local `._scope`, catch that
|
||||||
# `._scope.cancelled_caught` and re-raise any remote error
|
# `._scope.cancelled_caught` and re-raise any remote error
|
||||||
# once exiting (or manually calling `.result()`) the
|
# once exiting (or manually calling `.wait_for_result()`) the
|
||||||
# `.open_context()` block.
|
# `.open_context()` block.
|
||||||
cs: trio.CancelScope = self._scope
|
cs: trio.CancelScope = self._scope
|
||||||
if (
|
if (
|
||||||
|
@ -764,8 +800,9 @@ class Context:
|
||||||
# `trio.Cancelled` subtype here ;)
|
# `trio.Cancelled` subtype here ;)
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||||
|
# from .devx import pause_from_sync
|
||||||
|
# pause_from_sync()
|
||||||
self._scope.cancel()
|
self._scope.cancel()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||||
# from .devx import mk_pdb
|
# from .devx import mk_pdb
|
||||||
|
@ -889,7 +926,7 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
side: str = self.side
|
side: str = self.side
|
||||||
self._cancel_called: bool = True
|
self.cancel_called: bool = True
|
||||||
|
|
||||||
header: str = (
|
header: str = (
|
||||||
f'Cancelling ctx with peer from {side.upper()} side\n\n'
|
f'Cancelling ctx with peer from {side.upper()} side\n\n'
|
||||||
|
@ -912,7 +949,7 @@ class Context:
|
||||||
# `._scope.cancel()` since we expect the eventual
|
# `._scope.cancel()` since we expect the eventual
|
||||||
# `ContextCancelled` from the other side to trigger this
|
# `ContextCancelled` from the other side to trigger this
|
||||||
# when the runtime finally receives it during teardown
|
# when the runtime finally receives it during teardown
|
||||||
# (normally in `.result()` called from
|
# (normally in `.wait_for_result()` called from
|
||||||
# `Portal.open_context().__aexit__()`)
|
# `Portal.open_context().__aexit__()`)
|
||||||
if side == 'parent':
|
if side == 'parent':
|
||||||
if not self._portal:
|
if not self._portal:
|
||||||
|
@ -1025,10 +1062,10 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
our_uid: tuple = self.chan.uid
|
peer_uid: tuple = self.chan.uid
|
||||||
|
|
||||||
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
|
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
|
||||||
# for "graceful cancellation" case:
|
# for "graceful cancellation" case(s):
|
||||||
#
|
#
|
||||||
# Whenever a "side" of a context (a `Task` running in
|
# Whenever a "side" of a context (a `Task` running in
|
||||||
# an actor) **is** the side which requested ctx
|
# an actor) **is** the side which requested ctx
|
||||||
|
@ -1045,9 +1082,11 @@ class Context:
|
||||||
# set to the `Actor.uid` of THIS task (i.e. the
|
# set to the `Actor.uid` of THIS task (i.e. the
|
||||||
# cancellation requesting task's actor is the actor
|
# cancellation requesting task's actor is the actor
|
||||||
# checking whether it should absorb the ctxc).
|
# checking whether it should absorb the ctxc).
|
||||||
|
self_ctxc: bool = self._is_self_cancelled(remote_error)
|
||||||
if (
|
if (
|
||||||
|
self_ctxc
|
||||||
|
and
|
||||||
not raise_ctxc_from_self_call
|
not raise_ctxc_from_self_call
|
||||||
and self._is_self_cancelled(remote_error)
|
|
||||||
|
|
||||||
# TODO: ?potentially it is useful to emit certain
|
# TODO: ?potentially it is useful to emit certain
|
||||||
# warning/cancel logs for the cases where the
|
# warning/cancel logs for the cases where the
|
||||||
|
@ -1077,8 +1116,8 @@ class Context:
|
||||||
and isinstance(remote_error, RemoteActorError)
|
and isinstance(remote_error, RemoteActorError)
|
||||||
and remote_error.boxed_type is StreamOverrun
|
and remote_error.boxed_type is StreamOverrun
|
||||||
|
|
||||||
# and tuple(remote_error.msgdata['sender']) == our_uid
|
# and tuple(remote_error.msgdata['sender']) == peer_uid
|
||||||
and tuple(remote_error.sender) == our_uid
|
and tuple(remote_error.sender) == peer_uid
|
||||||
):
|
):
|
||||||
# NOTE: we set the local scope error to any "self
|
# NOTE: we set the local scope error to any "self
|
||||||
# cancellation" error-response thus "absorbing"
|
# cancellation" error-response thus "absorbing"
|
||||||
|
@ -1140,9 +1179,9 @@ class Context:
|
||||||
of the remote cancellation.
|
of the remote cancellation.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = hide_tb
|
__tracebackhide__: bool = False
|
||||||
assert self._portal, (
|
assert self._portal, (
|
||||||
"Context.result() can not be called from callee side!"
|
'`Context.wait_for_result()` can not be called from callee side!'
|
||||||
)
|
)
|
||||||
if self._final_result_is_set():
|
if self._final_result_is_set():
|
||||||
return self._result
|
return self._result
|
||||||
|
@ -1169,7 +1208,8 @@ class Context:
|
||||||
drained_msgs,
|
drained_msgs,
|
||||||
) = await msgops.drain_to_final_msg(
|
) = await msgops.drain_to_final_msg(
|
||||||
ctx=self,
|
ctx=self,
|
||||||
hide_tb=hide_tb,
|
# hide_tb=hide_tb,
|
||||||
|
hide_tb=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
drained_status: str = (
|
drained_status: str = (
|
||||||
|
@ -1185,6 +1225,8 @@ class Context:
|
||||||
|
|
||||||
log.cancel(drained_status)
|
log.cancel(drained_status)
|
||||||
|
|
||||||
|
# __tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
self.maybe_raise(
|
self.maybe_raise(
|
||||||
# NOTE: obvi we don't care if we
|
# NOTE: obvi we don't care if we
|
||||||
# overran the far end if we're already
|
# overran the far end if we're already
|
||||||
|
@ -1197,7 +1239,8 @@ class Context:
|
||||||
# raising something we know might happen
|
# raising something we know might happen
|
||||||
# during cancellation ;)
|
# during cancellation ;)
|
||||||
(not self._cancel_called)
|
(not self._cancel_called)
|
||||||
)
|
),
|
||||||
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
# TODO: eventually make `.outcome: Outcome` and thus return
|
# TODO: eventually make `.outcome: Outcome` and thus return
|
||||||
# `self.outcome.unwrap()` here!
|
# `self.outcome.unwrap()` here!
|
||||||
|
@ -1583,7 +1626,7 @@ class Context:
|
||||||
|
|
||||||
- NEVER `return` early before delivering the msg!
|
- NEVER `return` early before delivering the msg!
|
||||||
bc if the error is a ctxc and there is a task waiting on
|
bc if the error is a ctxc and there is a task waiting on
|
||||||
`.result()` we need the msg to be
|
`.wait_for_result()` we need the msg to be
|
||||||
`send_chan.send_nowait()`-ed over the `._rx_chan` so
|
`send_chan.send_nowait()`-ed over the `._rx_chan` so
|
||||||
that the error is relayed to that waiter task and thus
|
that the error is relayed to that waiter task and thus
|
||||||
raised in user code!
|
raised in user code!
|
||||||
|
@ -1828,7 +1871,7 @@ async def open_context_from_portal(
|
||||||
When the "callee" (side that is "called"/started by a call
|
When the "callee" (side that is "called"/started by a call
|
||||||
to *this* method) returns, the caller side (this) unblocks
|
to *this* method) returns, the caller side (this) unblocks
|
||||||
and any final value delivered from the other end can be
|
and any final value delivered from the other end can be
|
||||||
retrieved using the `Contex.result()` api.
|
retrieved using the `Contex.wait_for_result()` api.
|
||||||
|
|
||||||
The yielded ``Context`` instance further allows for opening
|
The yielded ``Context`` instance further allows for opening
|
||||||
bidirectional streams, explicit cancellation and
|
bidirectional streams, explicit cancellation and
|
||||||
|
@ -1965,14 +2008,14 @@ async def open_context_from_portal(
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
|
||||||
# ??TODO??: do we still want to consider this or is
|
# ??TODO??: do we still want to consider this or is
|
||||||
# the `else:` block handling via a `.result()`
|
# the `else:` block handling via a `.wait_for_result()`
|
||||||
# call below enough??
|
# call below enough??
|
||||||
#
|
#
|
||||||
# -[ ] pretty sure `.result()` internals do the
|
# -[ ] pretty sure `.wait_for_result()` internals do the
|
||||||
# same as our ctxc handler below so it ended up
|
# same as our ctxc handler below so it ended up
|
||||||
# being same (repeated?) behaviour, but ideally we
|
# being same (repeated?) behaviour, but ideally we
|
||||||
# wouldn't have that duplication either by somehow
|
# wouldn't have that duplication either by somehow
|
||||||
# factoring the `.result()` handler impl in a way
|
# factoring the `.wait_for_result()` handler impl in a way
|
||||||
# that we can re-use it around the `yield` ^ here
|
# that we can re-use it around the `yield` ^ here
|
||||||
# or vice versa?
|
# or vice versa?
|
||||||
#
|
#
|
||||||
|
@ -2110,7 +2153,7 @@ async def open_context_from_portal(
|
||||||
# AND a group-exc is only raised if there was > 1
|
# AND a group-exc is only raised if there was > 1
|
||||||
# tasks started *here* in the "caller" / opener
|
# tasks started *here* in the "caller" / opener
|
||||||
# block. If any one of those tasks calls
|
# block. If any one of those tasks calls
|
||||||
# `.result()` or `MsgStream.receive()`
|
# `.wait_for_result()` or `MsgStream.receive()`
|
||||||
# `._maybe_raise_remote_err()` will be transitively
|
# `._maybe_raise_remote_err()` will be transitively
|
||||||
# called and the remote error raised causing all
|
# called and the remote error raised causing all
|
||||||
# tasks to be cancelled.
|
# tasks to be cancelled.
|
||||||
|
@ -2180,7 +2223,7 @@ async def open_context_from_portal(
|
||||||
f'|_{ctx._task}\n'
|
f'|_{ctx._task}\n'
|
||||||
)
|
)
|
||||||
# XXX NOTE XXX: the below call to
|
# XXX NOTE XXX: the below call to
|
||||||
# `Context.result()` will ALWAYS raise
|
# `Context.wait_for_result()` will ALWAYS raise
|
||||||
# a `ContextCancelled` (via an embedded call to
|
# a `ContextCancelled` (via an embedded call to
|
||||||
# `Context._maybe_raise_remote_err()`) IFF
|
# `Context._maybe_raise_remote_err()`) IFF
|
||||||
# a `Context._remote_error` was set by the runtime
|
# a `Context._remote_error` was set by the runtime
|
||||||
|
@ -2190,10 +2233,10 @@ async def open_context_from_portal(
|
||||||
# ALWAYS SET any time "callee" side fails and causes "caller
|
# ALWAYS SET any time "callee" side fails and causes "caller
|
||||||
# side" cancellation via a `ContextCancelled` here.
|
# side" cancellation via a `ContextCancelled` here.
|
||||||
try:
|
try:
|
||||||
result_or_err: Exception|Any = await ctx.result()
|
result_or_err: Exception|Any = await ctx.wait_for_result()
|
||||||
except BaseException as berr:
|
except BaseException as berr:
|
||||||
# on normal teardown, if we get some error
|
# on normal teardown, if we get some error
|
||||||
# raised in `Context.result()` we still want to
|
# raised in `Context.wait_for_result()` we still want to
|
||||||
# save that error on the ctx's state to
|
# save that error on the ctx's state to
|
||||||
# determine things like `.cancelled_caught` for
|
# determine things like `.cancelled_caught` for
|
||||||
# cases where there was remote cancellation but
|
# cases where there was remote cancellation but
|
||||||
|
|
|
@ -56,14 +56,12 @@ async def get_registry(
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Return a portal instance connected to a local or remote
|
Return a portal instance connected to a local or remote
|
||||||
arbiter.
|
registry-service actor; if a connection already exists re-use it
|
||||||
|
(presumably to call a `.register_actor()` registry runtime RPC
|
||||||
|
ep).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
|
|
||||||
if not actor:
|
|
||||||
raise RuntimeError("No actor instance has been defined yet?")
|
|
||||||
|
|
||||||
if actor.is_registrar:
|
if actor.is_registrar:
|
||||||
# we're already the arbiter
|
# we're already the arbiter
|
||||||
# (likely a re-entrant call from the arbiter actor)
|
# (likely a re-entrant call from the arbiter actor)
|
||||||
|
@ -72,6 +70,8 @@ async def get_registry(
|
||||||
Channel((host, port))
|
Channel((host, port))
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
# TODO: try to look pre-existing connection from
|
||||||
|
# `Actor._peers` and use it instead?
|
||||||
async with (
|
async with (
|
||||||
_connect_chan(host, port) as chan,
|
_connect_chan(host, port) as chan,
|
||||||
open_portal(chan) as regstr_ptl,
|
open_portal(chan) as regstr_ptl,
|
||||||
|
|
|
@ -20,7 +20,8 @@ Sub-process entry points.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from functools import partial
|
from functools import partial
|
||||||
# import textwrap
|
import os
|
||||||
|
import textwrap
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -58,7 +59,7 @@ def _mp_main(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
The routine called *after fork* which invokes a fresh ``trio.run``
|
The routine called *after fork* which invokes a fresh `trio.run()`
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor._forkserver_info = forkserver_info
|
actor._forkserver_info = forkserver_info
|
||||||
|
@ -96,6 +97,35 @@ def _mp_main(
|
||||||
log.info(f"Subactor {actor.uid} terminated")
|
log.info(f"Subactor {actor.uid} terminated")
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: move this to some kinda `.devx._conc_lang.py` eventually
|
||||||
|
# as we work out our multi-domain state-flow-syntax!
|
||||||
|
def nest_from_op(
|
||||||
|
input_op: str,
|
||||||
|
tree_str: str,
|
||||||
|
|
||||||
|
back_from_op: int = 1,
|
||||||
|
) -> str:
|
||||||
|
'''
|
||||||
|
Depth-increment the input (presumably hierarchy/supervision)
|
||||||
|
input "tree string" below the provided `input_op` execution
|
||||||
|
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
|
||||||
|
`tree_str` to nest content aligned with the ops last char.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return (
|
||||||
|
f'{input_op}\n'
|
||||||
|
+
|
||||||
|
textwrap.indent(
|
||||||
|
tree_str,
|
||||||
|
prefix=(
|
||||||
|
len(input_op)
|
||||||
|
-
|
||||||
|
back_from_op
|
||||||
|
) *' ',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _trio_main(
|
def _trio_main(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
*,
|
*,
|
||||||
|
@ -119,7 +149,6 @@ def _trio_main(
|
||||||
|
|
||||||
if actor.loglevel is not None:
|
if actor.loglevel is not None:
|
||||||
get_console_log(actor.loglevel)
|
get_console_log(actor.loglevel)
|
||||||
import os
|
|
||||||
actor_info: str = (
|
actor_info: str = (
|
||||||
f'|_{actor}\n'
|
f'|_{actor}\n'
|
||||||
f' uid: {actor.uid}\n'
|
f' uid: {actor.uid}\n'
|
||||||
|
@ -128,13 +157,29 @@ def _trio_main(
|
||||||
f' loglevel: {actor.loglevel}\n'
|
f' loglevel: {actor.loglevel}\n'
|
||||||
)
|
)
|
||||||
log.info(
|
log.info(
|
||||||
'Started new trio subactor:\n'
|
'Started new `trio` subactor:\n'
|
||||||
+
|
+
|
||||||
'>\n' # like a "started/play"-icon from super perspective
|
nest_from_op(
|
||||||
+
|
input_op='(>', # like a "started/play"-icon from super perspective
|
||||||
actor_info,
|
tree_str=actor_info,
|
||||||
|
)
|
||||||
|
# '>(\n' # like a "started/play"-icon from super perspective
|
||||||
|
# +
|
||||||
|
# actor_info,
|
||||||
)
|
)
|
||||||
|
logmeth = log.info
|
||||||
|
message: str = (
|
||||||
|
# log.info(
|
||||||
|
'Subactor terminated\n'
|
||||||
|
+
|
||||||
|
nest_from_op(
|
||||||
|
input_op=')>', # like a "started/play"-icon from super perspective
|
||||||
|
tree_str=actor_info,
|
||||||
|
)
|
||||||
|
# 'x\n' # like a "crossed-out/killed" from super perspective
|
||||||
|
# +
|
||||||
|
# actor_info
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
if infect_asyncio:
|
if infect_asyncio:
|
||||||
actor._infected_aio = True
|
actor._infected_aio = True
|
||||||
|
@ -143,16 +188,18 @@ def _trio_main(
|
||||||
trio.run(trio_main)
|
trio.run(trio_main)
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
log.cancel(
|
logmeth = log.cancel
|
||||||
'Actor received KBI\n'
|
message: str = (
|
||||||
|
'Actor received KBI (aka an OS-cancel)\n'
|
||||||
+
|
+
|
||||||
actor_info
|
nest_from_op(
|
||||||
|
input_op='c)>', # like a "started/play"-icon from super perspective
|
||||||
|
tree_str=actor_info,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
except BaseException:
|
||||||
|
log.exception('Actor crashed exit?')
|
||||||
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.info(
|
logmeth(message)
|
||||||
'Subactor terminated\n'
|
|
||||||
+
|
|
||||||
'x\n' # like a "crossed-out/killed" from super perspective
|
|
||||||
+
|
|
||||||
actor_info
|
|
||||||
)
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ from typing import (
|
||||||
TypeVar,
|
TypeVar,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# import pdbp
|
||||||
import msgspec
|
import msgspec
|
||||||
from tricycle import BufferedReceiveStream
|
from tricycle import BufferedReceiveStream
|
||||||
import trio
|
import trio
|
||||||
|
@ -290,12 +291,14 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
# @pdbp.hideframe
|
||||||
async def send(
|
async def send(
|
||||||
self,
|
self,
|
||||||
msg: msgtypes.MsgType,
|
msg: msgtypes.MsgType,
|
||||||
|
|
||||||
strict_types: bool = True,
|
strict_types: bool = True,
|
||||||
# hide_tb: bool = False,
|
hide_tb: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Send a msgpack encoded py-object-blob-as-msg over TCP.
|
Send a msgpack encoded py-object-blob-as-msg over TCP.
|
||||||
|
@ -304,7 +307,10 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
invalid msg type
|
invalid msg type
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# __tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
# try:
|
||||||
|
# XXX see `trio._sync.AsyncContextManagerMixin` for details
|
||||||
|
# on the `.acquire()`/`.release()` sequencing..
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
|
|
||||||
# NOTE: lookup the `trio.Task.context`'s var for
|
# NOTE: lookup the `trio.Task.context`'s var for
|
||||||
|
@ -352,6 +358,14 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
size: bytes = struct.pack("<I", len(bytes_data))
|
size: bytes = struct.pack("<I", len(bytes_data))
|
||||||
return await self.stream.send_all(size + bytes_data)
|
return await self.stream.send_all(size + bytes_data)
|
||||||
|
|
||||||
|
# TODO: does it help ever to dynamically show this
|
||||||
|
# frame?
|
||||||
|
# except BaseException as _err:
|
||||||
|
# err = _err
|
||||||
|
# if not isinstance(err, MsgTypeError):
|
||||||
|
# __tracebackhide__: bool = False
|
||||||
|
# raise
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def laddr(self) -> tuple[str, int]:
|
def laddr(self) -> tuple[str, int]:
|
||||||
return self._laddr
|
return self._laddr
|
||||||
|
@ -560,27 +574,40 @@ class Channel:
|
||||||
)
|
)
|
||||||
return transport
|
return transport
|
||||||
|
|
||||||
|
# TODO: something like,
|
||||||
|
# `pdbp.hideframe_on(errors=[MsgTypeError])`
|
||||||
|
# @pdbp.hideframe
|
||||||
async def send(
|
async def send(
|
||||||
self,
|
self,
|
||||||
payload: Any,
|
payload: Any,
|
||||||
|
|
||||||
# hide_tb: bool = False,
|
hide_tb: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Send a coded msg-blob over the transport.
|
Send a coded msg-blob over the transport.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# __tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
log.transport(
|
try:
|
||||||
'=> send IPC msg:\n\n'
|
log.transport(
|
||||||
f'{pformat(payload)}\n'
|
'=> send IPC msg:\n\n'
|
||||||
) # type: ignore
|
f'{pformat(payload)}\n'
|
||||||
assert self._transport
|
)
|
||||||
await self._transport.send(
|
# assert self._transport # but why typing?
|
||||||
payload,
|
await self._transport.send(
|
||||||
# hide_tb=hide_tb,
|
payload,
|
||||||
)
|
hide_tb=hide_tb,
|
||||||
|
)
|
||||||
|
except BaseException as _err:
|
||||||
|
err = _err # bind for introspection
|
||||||
|
if not isinstance(_err, MsgTypeError):
|
||||||
|
# assert err
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
else:
|
||||||
|
assert err.cid
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
assert self._transport
|
assert self._transport
|
||||||
|
|
|
@ -121,7 +121,8 @@ class Portal:
|
||||||
)
|
)
|
||||||
return self.chan
|
return self.chan
|
||||||
|
|
||||||
# TODO: factor this out into an `ActorNursery` wrapper
|
# TODO: factor this out into a `.highlevel` API-wrapper that uses
|
||||||
|
# a single `.open_context()` call underneath.
|
||||||
async def _submit_for_result(
|
async def _submit_for_result(
|
||||||
self,
|
self,
|
||||||
ns: str,
|
ns: str,
|
||||||
|
@ -141,13 +142,22 @@ class Portal:
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: we should deprecate this API right? since if we remove
|
||||||
|
# `.run_in_actor()` (and instead move it to a `.highlevel`
|
||||||
|
# wrapper api (around a single `.open_context()` call) we don't
|
||||||
|
# really have any notion of a "main" remote task any more?
|
||||||
|
#
|
||||||
# @api_frame
|
# @api_frame
|
||||||
async def result(self) -> Any:
|
async def wait_for_result(
|
||||||
|
self,
|
||||||
|
hide_tb: bool = True,
|
||||||
|
) -> Any:
|
||||||
'''
|
'''
|
||||||
Return the result(s) from the remote actor's "main" task.
|
Return the final result delivered by a `Return`-msg from the
|
||||||
|
remote peer actor's "main" task's `return` statement.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__: bool = hide_tb
|
||||||
# Check for non-rpc errors slapped on the
|
# Check for non-rpc errors slapped on the
|
||||||
# channel for which we always raise
|
# channel for which we always raise
|
||||||
exc = self.channel._exc
|
exc = self.channel._exc
|
||||||
|
@ -182,6 +192,23 @@ class Portal:
|
||||||
|
|
||||||
return self._final_result_pld
|
return self._final_result_pld
|
||||||
|
|
||||||
|
# TODO: factor this out into a `.highlevel` API-wrapper that uses
|
||||||
|
# a single `.open_context()` call underneath.
|
||||||
|
async def result(
|
||||||
|
self,
|
||||||
|
*args,
|
||||||
|
**kwargs,
|
||||||
|
) -> Any|Exception:
|
||||||
|
typname: str = type(self).__name__
|
||||||
|
log.warning(
|
||||||
|
f'`{typname}.result()` is DEPRECATED!\n'
|
||||||
|
'Use `{typname.wait_for_result()` instead!\n'
|
||||||
|
)
|
||||||
|
return await self.wait_for_result(
|
||||||
|
*args,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
async def _cancel_streams(self):
|
async def _cancel_streams(self):
|
||||||
# terminate all locally running async generator
|
# terminate all locally running async generator
|
||||||
# IPC calls
|
# IPC calls
|
||||||
|
@ -240,6 +267,7 @@ class Portal:
|
||||||
f'{reminfo}'
|
f'{reminfo}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX the one spot we set it?
|
||||||
self.channel._cancel_called: bool = True
|
self.channel._cancel_called: bool = True
|
||||||
try:
|
try:
|
||||||
# send cancel cmd - might not get response
|
# send cancel cmd - might not get response
|
||||||
|
@ -279,6 +307,8 @@ class Portal:
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# TODO: do we still need this for low level `Actor`-runtime
|
||||||
|
# method calls or can we also remove it?
|
||||||
async def run_from_ns(
|
async def run_from_ns(
|
||||||
self,
|
self,
|
||||||
namespace_path: str,
|
namespace_path: str,
|
||||||
|
@ -316,6 +346,8 @@ class Portal:
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: factor this out into a `.highlevel` API-wrapper that uses
|
||||||
|
# a single `.open_context()` call underneath.
|
||||||
async def run(
|
async def run(
|
||||||
self,
|
self,
|
||||||
func: str,
|
func: str,
|
||||||
|
@ -370,6 +402,8 @@ class Portal:
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: factor this out into a `.highlevel` API-wrapper that uses
|
||||||
|
# a single `.open_context()` call underneath.
|
||||||
@acm
|
@acm
|
||||||
async def open_stream_from(
|
async def open_stream_from(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -21,6 +21,7 @@ Root actor runtime ignition(s).
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import importlib
|
import importlib
|
||||||
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
|
@ -115,10 +116,16 @@ async def open_root_actor(
|
||||||
if (
|
if (
|
||||||
debug_mode
|
debug_mode
|
||||||
and maybe_enable_greenback
|
and maybe_enable_greenback
|
||||||
and await _debug.maybe_init_greenback(
|
and (
|
||||||
raise_not_found=False,
|
maybe_mod := await _debug.maybe_init_greenback(
|
||||||
|
raise_not_found=False,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
|
logger.info(
|
||||||
|
f'Found `greenback` installed @ {maybe_mod}\n'
|
||||||
|
'Enabling `tractor.pause_from_sync()` support!\n'
|
||||||
|
)
|
||||||
os.environ['PYTHONBREAKPOINT'] = (
|
os.environ['PYTHONBREAKPOINT'] = (
|
||||||
'tractor.devx._debug._sync_pause_from_builtin'
|
'tractor.devx._debug._sync_pause_from_builtin'
|
||||||
)
|
)
|
||||||
|
@ -264,7 +271,10 @@ async def open_root_actor(
|
||||||
|
|
||||||
except OSError:
|
except OSError:
|
||||||
# TODO: make this a "discovery" log level?
|
# TODO: make this a "discovery" log level?
|
||||||
logger.warning(f'No actor registry found @ {addr}')
|
logger.info(
|
||||||
|
f'No actor registry found @ {addr}\n'
|
||||||
|
# 'Registry will be initialized in local actor..'
|
||||||
|
)
|
||||||
|
|
||||||
async with trio.open_nursery() as tn:
|
async with trio.open_nursery() as tn:
|
||||||
for addr in registry_addrs:
|
for addr in registry_addrs:
|
||||||
|
@ -365,23 +375,25 @@ async def open_root_actor(
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
yield actor
|
yield actor
|
||||||
|
|
||||||
except (
|
except (
|
||||||
Exception,
|
Exception,
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
) as err:
|
) as err:
|
||||||
|
# XXX NOTE XXX see equiv note inside
|
||||||
import inspect
|
# `._runtime.Actor._stream_handler()` where in the
|
||||||
|
# non-root or root-that-opened-this-mahually case we
|
||||||
|
# wait for the local actor-nursery to exit before
|
||||||
|
# exiting the transport channel handler.
|
||||||
entered: bool = await _debug._maybe_enter_pm(
|
entered: bool = await _debug._maybe_enter_pm(
|
||||||
err,
|
err,
|
||||||
api_frame=inspect.currentframe(),
|
api_frame=inspect.currentframe(),
|
||||||
)
|
)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not entered
|
not entered
|
||||||
and not is_multi_cancelled(err)
|
and
|
||||||
|
not is_multi_cancelled(err)
|
||||||
):
|
):
|
||||||
logger.exception('Root actor crashed:\n')
|
logger.exception('Root actor crashed\n')
|
||||||
|
|
||||||
# ALWAYS re-raise any error bubbled up from the
|
# ALWAYS re-raise any error bubbled up from the
|
||||||
# runtime!
|
# runtime!
|
||||||
|
|
|
@ -89,6 +89,15 @@ if TYPE_CHECKING:
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: move to a `tractor.lowlevel.rpc` with the below
|
||||||
|
# func-type-cases implemented "on top of" `@context` defs.
|
||||||
|
# -[ ] std async func
|
||||||
|
# -[ ] `Portal.open_stream_from()` with async-gens?
|
||||||
|
# |_ possibly a duplex form of this with a
|
||||||
|
# `sent_from_peer = yield send_to_peer` form, which would require
|
||||||
|
# syncing the send/recv side with possibly `.receive_nowait()`
|
||||||
|
# on each `yield`?
|
||||||
|
# -[ ]
|
||||||
async def _invoke_non_context(
|
async def _invoke_non_context(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
cancel_scope: CancelScope,
|
cancel_scope: CancelScope,
|
||||||
|
@ -108,6 +117,7 @@ async def _invoke_non_context(
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
):
|
):
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
|
cs: CancelScope|None = None # ref when activated
|
||||||
|
|
||||||
# TODO: can we unify this with the `context=True` impl below?
|
# TODO: can we unify this with the `context=True` impl below?
|
||||||
if inspect.isasyncgen(coro):
|
if inspect.isasyncgen(coro):
|
||||||
|
@ -160,10 +170,6 @@ async def _invoke_non_context(
|
||||||
functype='asyncgen',
|
functype='asyncgen',
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# XXX: the async-func may spawn further tasks which push
|
|
||||||
# back values like an async-generator would but must
|
|
||||||
# manualy construct the response dict-packet-responses as
|
|
||||||
# above
|
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
ctx._scope = cs
|
ctx._scope = cs
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
|
@ -175,15 +181,13 @@ async def _invoke_non_context(
|
||||||
await chan.send(
|
await chan.send(
|
||||||
Stop(cid=cid)
|
Stop(cid=cid)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# simplest function/method request-response pattern
|
||||||
|
# XXX: in the most minimally used case, just a scheduled internal runtime
|
||||||
|
# call to `Actor._cancel_task()` from the ctx-peer task since we
|
||||||
|
# don't (yet) have a dedicated IPC msg.
|
||||||
|
# ------ - ------
|
||||||
else:
|
else:
|
||||||
# regular async function/method
|
|
||||||
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
|
||||||
# from a remote request to cancel some `Context`.
|
|
||||||
# ------ - ------
|
|
||||||
# TODO: ideally we unify this with the above `context=True`
|
|
||||||
# block such that for any remote invocation ftype, we
|
|
||||||
# always invoke the far end RPC task scheduling the same
|
|
||||||
# way: using the linked IPC context machinery.
|
|
||||||
failed_resp: bool = False
|
failed_resp: bool = False
|
||||||
try:
|
try:
|
||||||
ack = StartAck(
|
ack = StartAck(
|
||||||
|
@ -354,8 +358,14 @@ async def _errors_relayed_via_ipc(
|
||||||
# channel.
|
# channel.
|
||||||
task_status.started(err)
|
task_status.started(err)
|
||||||
|
|
||||||
# always reraise KBIs so they propagate at the sys-process level.
|
# always reraise KBIs so they propagate at the sys-process
|
||||||
if isinstance(err, KeyboardInterrupt):
|
# level.
|
||||||
|
# XXX LOL, except when running in asyncio mode XD
|
||||||
|
# cmon guys, wtf..
|
||||||
|
if (
|
||||||
|
isinstance(err, KeyboardInterrupt)
|
||||||
|
# and not actor.is_infected_aio()
|
||||||
|
):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# RPC task bookeeping.
|
# RPC task bookeeping.
|
||||||
|
@ -458,7 +468,6 @@ async def _invoke(
|
||||||
# tb: TracebackType = None
|
# tb: TracebackType = None
|
||||||
|
|
||||||
cancel_scope = CancelScope()
|
cancel_scope = CancelScope()
|
||||||
cs: CancelScope|None = None # ref when activated
|
|
||||||
ctx = actor.get_context(
|
ctx = actor.get_context(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -607,6 +616,8 @@ async def _invoke(
|
||||||
# `@context` marked RPC function.
|
# `@context` marked RPC function.
|
||||||
# - `._portal` is never set.
|
# - `._portal` is never set.
|
||||||
try:
|
try:
|
||||||
|
tn: trio.Nursery
|
||||||
|
rpc_ctx_cs: CancelScope
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
msgops.maybe_limit_plds(
|
msgops.maybe_limit_plds(
|
||||||
|
@ -616,7 +627,7 @@ async def _invoke(
|
||||||
),
|
),
|
||||||
):
|
):
|
||||||
ctx._scope_nursery = tn
|
ctx._scope_nursery = tn
|
||||||
ctx._scope = tn.cancel_scope
|
rpc_ctx_cs = ctx._scope = tn.cancel_scope
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
|
|
||||||
# TODO: better `trionics` tooling:
|
# TODO: better `trionics` tooling:
|
||||||
|
@ -642,7 +653,7 @@ async def _invoke(
|
||||||
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
||||||
# which cancels the scope presuming the input error
|
# which cancels the scope presuming the input error
|
||||||
# is not a `.cancel_acked` pleaser.
|
# is not a `.cancel_acked` pleaser.
|
||||||
if ctx._scope.cancelled_caught:
|
if rpc_ctx_cs.cancelled_caught:
|
||||||
our_uid: tuple = actor.uid
|
our_uid: tuple = actor.uid
|
||||||
|
|
||||||
# first check for and raise any remote error
|
# first check for and raise any remote error
|
||||||
|
@ -652,9 +663,7 @@ async def _invoke(
|
||||||
if re := ctx._remote_error:
|
if re := ctx._remote_error:
|
||||||
ctx._maybe_raise_remote_err(re)
|
ctx._maybe_raise_remote_err(re)
|
||||||
|
|
||||||
cs: CancelScope = ctx._scope
|
if rpc_ctx_cs.cancel_called:
|
||||||
|
|
||||||
if cs.cancel_called:
|
|
||||||
canceller: tuple = ctx.canceller
|
canceller: tuple = ctx.canceller
|
||||||
explain: str = f'{ctx.side!r}-side task was cancelled by '
|
explain: str = f'{ctx.side!r}-side task was cancelled by '
|
||||||
|
|
||||||
|
@ -680,9 +689,15 @@ async def _invoke(
|
||||||
elif canceller == ctx.chan.uid:
|
elif canceller == ctx.chan.uid:
|
||||||
explain += f'its {ctx.peer_side!r}-side peer'
|
explain += f'its {ctx.peer_side!r}-side peer'
|
||||||
|
|
||||||
else:
|
elif canceller == our_uid:
|
||||||
|
explain += 'itself'
|
||||||
|
|
||||||
|
elif canceller:
|
||||||
explain += 'a remote peer'
|
explain += 'a remote peer'
|
||||||
|
|
||||||
|
else:
|
||||||
|
explain += 'an unknown cause?'
|
||||||
|
|
||||||
explain += (
|
explain += (
|
||||||
add_div(message=explain)
|
add_div(message=explain)
|
||||||
+
|
+
|
||||||
|
@ -1238,7 +1253,7 @@ async def process_messages(
|
||||||
'Exiting IPC msg loop with final msg\n\n'
|
'Exiting IPC msg loop with final msg\n\n'
|
||||||
f'<= peer: {chan.uid}\n'
|
f'<= peer: {chan.uid}\n'
|
||||||
f' |_{chan}\n\n'
|
f' |_{chan}\n\n'
|
||||||
f'{pretty_struct.pformat(msg)}'
|
# f'{pretty_struct.pformat(msg)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
log.runtime(message)
|
log.runtime(message)
|
||||||
|
|
|
@ -1046,6 +1046,10 @@ class Actor:
|
||||||
# TODO: another `Struct` for rtvs..
|
# TODO: another `Struct` for rtvs..
|
||||||
rvs: dict[str, Any] = spawnspec._runtime_vars
|
rvs: dict[str, Any] = spawnspec._runtime_vars
|
||||||
if rvs['_debug_mode']:
|
if rvs['_debug_mode']:
|
||||||
|
from .devx import (
|
||||||
|
enable_stack_on_sig,
|
||||||
|
maybe_init_greenback,
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
# TODO: maybe return some status msgs upward
|
# TODO: maybe return some status msgs upward
|
||||||
# to that we can emit them in `con_status`
|
# to that we can emit them in `con_status`
|
||||||
|
@ -1053,13 +1057,27 @@ class Actor:
|
||||||
log.devx(
|
log.devx(
|
||||||
'Enabling `stackscope` traces on SIGUSR1'
|
'Enabling `stackscope` traces on SIGUSR1'
|
||||||
)
|
)
|
||||||
from .devx import enable_stack_on_sig
|
|
||||||
enable_stack_on_sig()
|
enable_stack_on_sig()
|
||||||
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
log.warning(
|
log.warning(
|
||||||
'`stackscope` not installed for use in debug mode!'
|
'`stackscope` not installed for use in debug mode!'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if rvs.get('use_greenback', False):
|
||||||
|
maybe_mod: ModuleType|None = await maybe_init_greenback()
|
||||||
|
if maybe_mod:
|
||||||
|
log.devx(
|
||||||
|
'Activated `greenback` '
|
||||||
|
'for `tractor.pause_from_sync()` support!'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
rvs['use_greenback'] = False
|
||||||
|
log.warning(
|
||||||
|
'`greenback` not installed for use in debug mode!\n'
|
||||||
|
'`tractor.pause_from_sync()` not available!'
|
||||||
|
)
|
||||||
|
|
||||||
rvs['_is_root'] = False
|
rvs['_is_root'] = False
|
||||||
_state._runtime_vars.update(rvs)
|
_state._runtime_vars.update(rvs)
|
||||||
|
|
||||||
|
@ -1717,8 +1735,8 @@ async def async_main(
|
||||||
|
|
||||||
# Register with the arbiter if we're told its addr
|
# Register with the arbiter if we're told its addr
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Registering `{actor.name}` ->\n'
|
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
|
||||||
f'{pformat(accept_addrs)}'
|
# ^-TODO-^ we should instead show the maddr here^^
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: ideally we don't fan out to all registrars
|
# TODO: ideally we don't fan out to all registrars
|
||||||
|
@ -1776,9 +1794,15 @@ async def async_main(
|
||||||
|
|
||||||
# Blocks here as expected until the root nursery is
|
# Blocks here as expected until the root nursery is
|
||||||
# killed (i.e. this actor is cancelled or signalled by the parent)
|
# killed (i.e. this actor is cancelled or signalled by the parent)
|
||||||
except Exception as err:
|
except Exception as internal_err:
|
||||||
log.runtime("Closing all actor lifetime contexts")
|
# ls: ExitStack = actor.lifetime_stack
|
||||||
actor.lifetime_stack.close()
|
# log.cancel(
|
||||||
|
# 'Closing all actor-lifetime exec scopes\n\n'
|
||||||
|
# f'|_{ls}\n'
|
||||||
|
# )
|
||||||
|
# # _debug.pause_from_sync()
|
||||||
|
# # await _debug.pause(shield=True)
|
||||||
|
# ls.close()
|
||||||
|
|
||||||
if not is_registered:
|
if not is_registered:
|
||||||
# TODO: I guess we could try to connect back
|
# TODO: I guess we could try to connect back
|
||||||
|
@ -1786,7 +1810,8 @@ async def async_main(
|
||||||
# once we have that all working with std streams locking?
|
# once we have that all working with std streams locking?
|
||||||
log.exception(
|
log.exception(
|
||||||
f"Actor errored and failed to register with arbiter "
|
f"Actor errored and failed to register with arbiter "
|
||||||
f"@ {actor.reg_addrs[0]}?")
|
f"@ {actor.reg_addrs[0]}?"
|
||||||
|
)
|
||||||
log.error(
|
log.error(
|
||||||
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
|
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
|
||||||
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
|
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
|
||||||
|
@ -1799,25 +1824,44 @@ async def async_main(
|
||||||
if actor._parent_chan:
|
if actor._parent_chan:
|
||||||
await try_ship_error_to_remote(
|
await try_ship_error_to_remote(
|
||||||
actor._parent_chan,
|
actor._parent_chan,
|
||||||
err,
|
internal_err,
|
||||||
)
|
)
|
||||||
|
|
||||||
# always!
|
# always!
|
||||||
match err:
|
match internal_err:
|
||||||
case ContextCancelled():
|
case ContextCancelled():
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Actor: {actor.uid} was task-context-cancelled with,\n'
|
f'Actor: {actor.uid} was task-context-cancelled with,\n'
|
||||||
f'str(err)'
|
f'str(internal_err)'
|
||||||
)
|
)
|
||||||
case _:
|
case _:
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.runtime(
|
teardown_msg: str = (
|
||||||
'Runtime nursery complete'
|
'Runtime nursery complete'
|
||||||
'-> Closing all actor lifetime contexts..'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ls: ExitStack = actor.lifetime_stack
|
||||||
|
cbs: list[Callable] = [
|
||||||
|
repr(tup[1].__wrapped__)
|
||||||
|
for tup in ls._exit_callbacks
|
||||||
|
]
|
||||||
|
if cbs:
|
||||||
|
cbs_str: str = '\n'.join(cbs)
|
||||||
|
teardown_msg += (
|
||||||
|
'-> Closing all actor-lifetime callbacks\n\n'
|
||||||
|
f'|_{cbs_str}\n'
|
||||||
|
)
|
||||||
|
# XXX NOTE XXX this will cause an error which
|
||||||
|
# prevents any `infected_aio` actor from continuing
|
||||||
|
# and any callbacks in the `ls` here WILL NOT be
|
||||||
|
# called!!
|
||||||
|
# await _debug.pause(shield=True)
|
||||||
|
|
||||||
|
ls.close()
|
||||||
|
|
||||||
# tear down all lifetime contexts if not in guest mode
|
# tear down all lifetime contexts if not in guest mode
|
||||||
# XXX: should this just be in the entrypoint?
|
# XXX: should this just be in the entrypoint?
|
||||||
actor.lifetime_stack.close()
|
actor.lifetime_stack.close()
|
||||||
|
@ -1856,23 +1900,28 @@ async def async_main(
|
||||||
failed = True
|
failed = True
|
||||||
|
|
||||||
if failed:
|
if failed:
|
||||||
log.warning(
|
teardown_msg += (
|
||||||
f'Failed to unregister {actor.name} from '
|
f'-> Failed to unregister {actor.name} from '
|
||||||
f'registar @ {addr}'
|
f'registar @ {addr}\n'
|
||||||
)
|
)
|
||||||
|
# log.warning(
|
||||||
|
|
||||||
# Ensure all peers (actors connected to us as clients) are finished
|
# Ensure all peers (actors connected to us as clients) are finished
|
||||||
if not actor._no_more_peers.is_set():
|
if not actor._no_more_peers.is_set():
|
||||||
if any(
|
if any(
|
||||||
chan.connected() for chan in chain(*actor._peers.values())
|
chan.connected() for chan in chain(*actor._peers.values())
|
||||||
):
|
):
|
||||||
log.runtime(
|
teardown_msg += (
|
||||||
f"Waiting for remaining peers {actor._peers} to clear")
|
f'-> Waiting for remaining peers {actor._peers} to clear..\n'
|
||||||
|
)
|
||||||
|
log.runtime(teardown_msg)
|
||||||
with CancelScope(shield=True):
|
with CancelScope(shield=True):
|
||||||
await actor._no_more_peers.wait()
|
await actor._no_more_peers.wait()
|
||||||
log.runtime("All peer channels are complete")
|
|
||||||
|
|
||||||
log.runtime("Runtime completed")
|
teardown_msg += ('-> All peer channels are complete\n')
|
||||||
|
|
||||||
|
teardown_msg += ('Actor runtime completed')
|
||||||
|
log.info(teardown_msg)
|
||||||
|
|
||||||
|
|
||||||
# TODO: rename to `Registry` and move to `._discovery`!
|
# TODO: rename to `Registry` and move to `._discovery`!
|
||||||
|
|
|
@ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = {
|
||||||
'_root_mailbox': (None, None),
|
'_root_mailbox': (None, None),
|
||||||
'_registry_addrs': [],
|
'_registry_addrs': [],
|
||||||
|
|
||||||
# for `breakpoint()` support
|
# for `tractor.pause_from_sync()` & `breakpoint()` support
|
||||||
'use_greenback': False,
|
'use_greenback': False,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
@property
|
@property
|
||||||
def ctx(self) -> Context:
|
def ctx(self) -> Context:
|
||||||
'''
|
'''
|
||||||
This stream's IPC `Context` ref.
|
A read-only ref to this stream's inter-actor-task `Context`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self._ctx
|
return self._ctx
|
||||||
|
|
|
@ -80,6 +80,7 @@ class ActorNursery:
|
||||||
'''
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
# TODO: maybe def these as fields of a struct looking type?
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
ria_nursery: trio.Nursery,
|
ria_nursery: trio.Nursery,
|
||||||
da_nursery: trio.Nursery,
|
da_nursery: trio.Nursery,
|
||||||
|
@ -88,8 +89,10 @@ class ActorNursery:
|
||||||
) -> None:
|
) -> None:
|
||||||
# self.supervisor = supervisor # TODO
|
# self.supervisor = supervisor # TODO
|
||||||
self._actor: Actor = actor
|
self._actor: Actor = actor
|
||||||
self._ria_nursery = ria_nursery
|
|
||||||
|
# TODO: rename to `._tn` for our conventional "task-nursery"
|
||||||
self._da_nursery = da_nursery
|
self._da_nursery = da_nursery
|
||||||
|
|
||||||
self._children: dict[
|
self._children: dict[
|
||||||
tuple[str, str],
|
tuple[str, str],
|
||||||
tuple[
|
tuple[
|
||||||
|
@ -98,15 +101,13 @@ class ActorNursery:
|
||||||
Portal | None,
|
Portal | None,
|
||||||
]
|
]
|
||||||
] = {}
|
] = {}
|
||||||
# portals spawned with ``run_in_actor()`` are
|
|
||||||
# cancelled when their "main" result arrives
|
|
||||||
self._cancel_after_result_on_exit: set = set()
|
|
||||||
self.cancelled: bool = False
|
self.cancelled: bool = False
|
||||||
self._join_procs = trio.Event()
|
self._join_procs = trio.Event()
|
||||||
self._at_least_one_child_in_debug: bool = False
|
self._at_least_one_child_in_debug: bool = False
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.exited = trio.Event()
|
|
||||||
self._scope_error: BaseException|None = None
|
self._scope_error: BaseException|None = None
|
||||||
|
self.exited = trio.Event()
|
||||||
|
|
||||||
# NOTE: when no explicit call is made to
|
# NOTE: when no explicit call is made to
|
||||||
# `.open_root_actor()` by application code,
|
# `.open_root_actor()` by application code,
|
||||||
|
@ -116,6 +117,13 @@ class ActorNursery:
|
||||||
# and syncing purposes to any actor opened nurseries.
|
# and syncing purposes to any actor opened nurseries.
|
||||||
self._implicit_runtime_started: bool = False
|
self._implicit_runtime_started: bool = False
|
||||||
|
|
||||||
|
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
|
||||||
|
# nursery when that API get's moved outside this primitive!
|
||||||
|
self._ria_nursery = ria_nursery
|
||||||
|
# portals spawned with ``run_in_actor()`` are
|
||||||
|
# cancelled when their "main" result arrives
|
||||||
|
self._cancel_after_result_on_exit: set = set()
|
||||||
|
|
||||||
async def start_actor(
|
async def start_actor(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -126,10 +134,14 @@ class ActorNursery:
|
||||||
rpc_module_paths: list[str]|None = None,
|
rpc_module_paths: list[str]|None = None,
|
||||||
enable_modules: list[str]|None = None,
|
enable_modules: list[str]|None = None,
|
||||||
loglevel: str|None = None, # set log level per subactor
|
loglevel: str|None = None, # set log level per subactor
|
||||||
nursery: trio.Nursery|None = None,
|
|
||||||
debug_mode: bool|None = None,
|
debug_mode: bool|None = None,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
|
||||||
|
# TODO: ideally we can rm this once we no longer have
|
||||||
|
# a `._ria_nursery` since the dependent APIs have been
|
||||||
|
# removed!
|
||||||
|
nursery: trio.Nursery|None = None,
|
||||||
|
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
'''
|
'''
|
||||||
Start a (daemon) actor: an process that has no designated
|
Start a (daemon) actor: an process that has no designated
|
||||||
|
@ -200,6 +212,7 @@ class ActorNursery:
|
||||||
# |_ dynamic @context decoration on child side
|
# |_ dynamic @context decoration on child side
|
||||||
# |_ implicit `Portal.open_context() as (ctx, first):`
|
# |_ implicit `Portal.open_context() as (ctx, first):`
|
||||||
# and `return first` on parent side.
|
# and `return first` on parent side.
|
||||||
|
# |_ mention how it's similar to `trio-parallel` API?
|
||||||
# -[ ] use @api_frame on the wrapper
|
# -[ ] use @api_frame on the wrapper
|
||||||
async def run_in_actor(
|
async def run_in_actor(
|
||||||
self,
|
self,
|
||||||
|
@ -269,11 +282,14 @@ class ActorNursery:
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Cancel this nursery by instructing each subactor to cancel
|
Cancel this actor-nursery by instructing each subactor's
|
||||||
itself and wait for all subactors to terminate.
|
runtime to cancel and wait for all underlying sub-processes
|
||||||
|
to terminate.
|
||||||
|
|
||||||
If ``hard_killl`` is set to ``True`` then kill the processes
|
If `hard_kill` is set then kill the processes directly using
|
||||||
directly without any far end graceful ``trio`` cancellation.
|
the spawning-backend's API/OS-machinery without any attempt
|
||||||
|
at (graceful) `trio`-style cancellation using our
|
||||||
|
`Actor.cancel()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
__runtimeframe__: int = 1 # noqa
|
||||||
|
@ -629,8 +645,12 @@ async def open_nursery(
|
||||||
f'|_{an}\n'
|
f'|_{an}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# shutdown runtime if it was started
|
|
||||||
if implicit_runtime:
|
if implicit_runtime:
|
||||||
|
# shutdown runtime if it was started and report noisly
|
||||||
|
# that we're did so.
|
||||||
msg += '=> Shutting down actor runtime <=\n'
|
msg += '=> Shutting down actor runtime <=\n'
|
||||||
|
log.info(msg)
|
||||||
|
|
||||||
log.info(msg)
|
else:
|
||||||
|
# keep noise low during std operation.
|
||||||
|
log.runtime(msg)
|
||||||
|
|
|
@ -29,6 +29,7 @@ from ._debug import (
|
||||||
shield_sigint_handler as shield_sigint_handler,
|
shield_sigint_handler as shield_sigint_handler,
|
||||||
open_crash_handler as open_crash_handler,
|
open_crash_handler as open_crash_handler,
|
||||||
maybe_open_crash_handler as maybe_open_crash_handler,
|
maybe_open_crash_handler as maybe_open_crash_handler,
|
||||||
|
maybe_init_greenback as maybe_init_greenback,
|
||||||
post_mortem as post_mortem,
|
post_mortem as post_mortem,
|
||||||
mk_pdb as mk_pdb,
|
mk_pdb as mk_pdb,
|
||||||
)
|
)
|
||||||
|
|
|
@ -69,6 +69,7 @@ from trio import (
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor._context import Context
|
from tractor._context import Context
|
||||||
|
from tractor import _state
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
is_root_process,
|
is_root_process,
|
||||||
|
@ -87,9 +88,6 @@ if TYPE_CHECKING:
|
||||||
from tractor._runtime import (
|
from tractor._runtime import (
|
||||||
Actor,
|
Actor,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
|
||||||
_codec,
|
|
||||||
)
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -1599,11 +1597,13 @@ async def _pause(
|
||||||
try:
|
try:
|
||||||
task: Task = current_task()
|
task: Task = current_task()
|
||||||
except RuntimeError as rte:
|
except RuntimeError as rte:
|
||||||
|
__tracebackhide__: bool = False
|
||||||
log.exception('Failed to get current task?')
|
log.exception('Failed to get current task?')
|
||||||
if actor.is_infected_aio():
|
if actor.is_infected_aio():
|
||||||
|
# mk_pdb().set_trace()
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
'`tractor.pause[_from_sync]()` not yet supported '
|
'`tractor.pause[_from_sync]()` not yet supported '
|
||||||
'for infected `asyncio` mode!'
|
'directly (infected) `asyncio` tasks!'
|
||||||
) from rte
|
) from rte
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
@ -2163,22 +2163,22 @@ def maybe_import_greenback(
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
async def maybe_init_greenback(
|
async def maybe_init_greenback(**kwargs) -> None|ModuleType:
|
||||||
**kwargs,
|
try:
|
||||||
) -> None|ModuleType:
|
if mod := maybe_import_greenback(**kwargs):
|
||||||
|
await mod.ensure_portal()
|
||||||
if mod := maybe_import_greenback(**kwargs):
|
log.devx(
|
||||||
await mod.ensure_portal()
|
'`greenback` portal opened!\n'
|
||||||
log.devx(
|
'Sync debug support activated!\n'
|
||||||
'`greenback` portal opened!\n'
|
)
|
||||||
'Sync debug support activated!\n'
|
return mod
|
||||||
)
|
except BaseException:
|
||||||
return mod
|
log.exception('Failed to init `greenback`..')
|
||||||
|
raise
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def _pause_from_bg_root_thread(
|
async def _pause_from_bg_root_thread(
|
||||||
behalf_of_thread: Thread,
|
behalf_of_thread: Thread,
|
||||||
repl: PdbREPL,
|
repl: PdbREPL,
|
||||||
|
@ -2399,18 +2399,37 @@ def pause_from_sync(
|
||||||
else: # we are presumably the `trio.run()` + main thread
|
else: # we are presumably the `trio.run()` + main thread
|
||||||
# raises on not-found by default
|
# raises on not-found by default
|
||||||
greenback: ModuleType = maybe_import_greenback()
|
greenback: ModuleType = maybe_import_greenback()
|
||||||
|
|
||||||
|
# TODO: how to ensure this is either dynamically (if
|
||||||
|
# needed) called here (in some bg tn??) or that the
|
||||||
|
# subactor always already called it?
|
||||||
|
# greenback: ModuleType = await maybe_init_greenback()
|
||||||
|
|
||||||
message += f'-> imported {greenback}\n'
|
message += f'-> imported {greenback}\n'
|
||||||
repl_owner: Task = current_task()
|
repl_owner: Task = current_task()
|
||||||
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
|
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
|
||||||
out = greenback.await_(
|
try:
|
||||||
_pause(
|
out = greenback.await_(
|
||||||
debug_func=None,
|
_pause(
|
||||||
repl=repl,
|
debug_func=None,
|
||||||
hide_tb=hide_tb,
|
repl=repl,
|
||||||
called_from_sync=True,
|
hide_tb=hide_tb,
|
||||||
**_pause_kwargs,
|
called_from_sync=True,
|
||||||
|
**_pause_kwargs,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
except RuntimeError as rte:
|
||||||
|
if not _state._runtime_vars.get(
|
||||||
|
'use_greenback',
|
||||||
|
False,
|
||||||
|
):
|
||||||
|
raise RuntimeError(
|
||||||
|
'`greenback` was never initialized in this actor!?\n\n'
|
||||||
|
f'{_state._runtime_vars}\n'
|
||||||
|
) from rte
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
if out:
|
if out:
|
||||||
bg_task, repl = out
|
bg_task, repl = out
|
||||||
assert repl is repl
|
assert repl is repl
|
||||||
|
@ -2801,10 +2820,10 @@ def open_crash_handler(
|
||||||
`trio.run()`.
|
`trio.run()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
err: BaseException
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
except tuple(catch) as err:
|
except tuple(catch) as err:
|
||||||
|
|
||||||
if type(err) not in ignore:
|
if type(err) not in ignore:
|
||||||
pdbp.xpm()
|
pdbp.xpm()
|
||||||
|
|
||||||
|
|
|
@ -234,7 +234,7 @@ def find_caller_info(
|
||||||
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
|
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
|
||||||
|
|
||||||
|
|
||||||
# TODO: -[x] move all this into new `.devx._code`!
|
# TODO: -[x] move all this into new `.devx._frame_stack`!
|
||||||
# -[ ] consider rename to _callstack?
|
# -[ ] consider rename to _callstack?
|
||||||
# -[ ] prolly create a `@runtime_api` dec?
|
# -[ ] prolly create a `@runtime_api` dec?
|
||||||
# |_ @api_frame seems better?
|
# |_ @api_frame seems better?
|
||||||
|
@ -286,3 +286,18 @@ def api_frame(
|
||||||
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
|
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
|
||||||
wrapped.__api_func__: bool = True
|
wrapped.__api_func__: bool = True
|
||||||
return wrapper(wrapped)
|
return wrapper(wrapped)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: something like this instead of the adhoc frame-unhiding
|
||||||
|
# blocks all over the runtime!! XD
|
||||||
|
# -[ ] ideally we can expect a certain error (set) and if something
|
||||||
|
# else is raised then all frames below the wrapped one will be
|
||||||
|
# un-hidden via `__tracebackhide__: bool = False`.
|
||||||
|
# |_ might need to dynamically mutate the code objs like
|
||||||
|
# `pdbp.hideframe()` does?
|
||||||
|
# -[ ] use this as a `@acm` decorator as introed in 3.10?
|
||||||
|
# @acm
|
||||||
|
# async def unhide_frame_when_not(
|
||||||
|
# error_set: set[BaseException],
|
||||||
|
# ) -> TracebackType:
|
||||||
|
# ...
|
||||||
|
|
|
@ -0,0 +1,134 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2024-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Daemon subactor as service(s) management and supervision primitives
|
||||||
|
and API.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
from contextlib import (
|
||||||
|
# asynccontextmanager as acm,
|
||||||
|
contextmanager as cm,
|
||||||
|
)
|
||||||
|
from collections import defaultdict
|
||||||
|
from typing import (
|
||||||
|
Callable,
|
||||||
|
Any,
|
||||||
|
)
|
||||||
|
|
||||||
|
import trio
|
||||||
|
from trio import TaskStatus
|
||||||
|
from tractor import (
|
||||||
|
ActorNursery,
|
||||||
|
current_actor,
|
||||||
|
ContextCancelled,
|
||||||
|
Context,
|
||||||
|
Portal,
|
||||||
|
)
|
||||||
|
|
||||||
|
from ._util import (
|
||||||
|
log, # sub-sys logger
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: implement a `@singleton` deco-API for wrapping the below
|
||||||
|
# factory's impl for general actor-singleton use?
|
||||||
|
#
|
||||||
|
# -[ ] go through the options peeps on SO did?
|
||||||
|
# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
|
||||||
|
# * including @mikenerone's answer
|
||||||
|
# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
|
||||||
|
#
|
||||||
|
# -[ ] put it in `tractor.lowlevel._globals` ?
|
||||||
|
# * fits with our oustanding actor-local/global feat req?
|
||||||
|
# |_ https://github.com/goodboy/tractor/issues/55
|
||||||
|
# * how can it relate to the `Actor.lifetime_stack` that was
|
||||||
|
# silently patched in?
|
||||||
|
# |_ we could implicitly call both of these in the same
|
||||||
|
# spot in the runtime using the lifetime stack?
|
||||||
|
# - `open_singleton_cm().__exit__()`
|
||||||
|
# -`del_singleton()`
|
||||||
|
# |_ gives SC fixtue semantics to sync code oriented around
|
||||||
|
# sub-process lifetime?
|
||||||
|
# * what about with `trio.RunVar`?
|
||||||
|
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
|
||||||
|
# - which we'll need for no-GIL cpython (right?) presuming
|
||||||
|
# multiple `trio.run()` calls in process?
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# @singleton
|
||||||
|
# async def open_service_mngr(
|
||||||
|
# **init_kwargs,
|
||||||
|
# ) -> ServiceMngr:
|
||||||
|
# '''
|
||||||
|
# Note this function body is invoke IFF no existing singleton instance already
|
||||||
|
# exists in this proc's memory.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# # setup
|
||||||
|
# yield ServiceMngr(**init_kwargs)
|
||||||
|
# # teardown
|
||||||
|
|
||||||
|
|
||||||
|
# a deletion API for explicit instance de-allocation?
|
||||||
|
# @open_service_mngr.deleter
|
||||||
|
# def del_service_mngr() -> None:
|
||||||
|
# mngr = open_service_mngr._singleton[0]
|
||||||
|
# open_service_mngr._singleton[0] = None
|
||||||
|
# del mngr
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: singleton factory API instead of a class API
|
||||||
|
@cm
|
||||||
|
def open_service_mngr(
|
||||||
|
*,
|
||||||
|
_singleton: list[ServiceMngr|None] = [None],
|
||||||
|
# NOTE; since default values for keyword-args are effectively
|
||||||
|
# module-vars/globals as per the note from,
|
||||||
|
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
|
||||||
|
#
|
||||||
|
# > "The default value is evaluated only once. This makes
|
||||||
|
# a difference when the default is a mutable object such as
|
||||||
|
# a list, dictionary, or instances of most classes"
|
||||||
|
#
|
||||||
|
**init_kwargs,
|
||||||
|
|
||||||
|
) -> ServiceMngr:
|
||||||
|
'''
|
||||||
|
Open a multi-subactor-as-service-daemon tree supervisor.
|
||||||
|
|
||||||
|
The delivered `ServiceMngr` is a singleton instance for each
|
||||||
|
actor-process and is allocated on first open and never
|
||||||
|
de-allocated unless explicitly deleted by al call to
|
||||||
|
`del_service_mngr()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
mngr: ServiceMngr|None
|
||||||
|
if (mngr := _singleton[0]) is None:
|
||||||
|
log.info('Allocating a new service mngr!')
|
||||||
|
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
|
||||||
|
else:
|
||||||
|
log.info(
|
||||||
|
'Using extant service mngr!\n\n'
|
||||||
|
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
|
||||||
|
)
|
||||||
|
|
||||||
|
with mngr:
|
||||||
|
yield mngr
|
||||||
|
|
||||||
|
|
|
@ -54,11 +54,12 @@ LOG_FORMAT = (
|
||||||
DATE_FORMAT = '%b %d %H:%M:%S'
|
DATE_FORMAT = '%b %d %H:%M:%S'
|
||||||
|
|
||||||
# FYI, ERROR is 40
|
# FYI, ERROR is 40
|
||||||
|
# TODO: use a `bidict` to avoid the :155 check?
|
||||||
CUSTOM_LEVELS: dict[str, int] = {
|
CUSTOM_LEVELS: dict[str, int] = {
|
||||||
'TRANSPORT': 5,
|
'TRANSPORT': 5,
|
||||||
'RUNTIME': 15,
|
'RUNTIME': 15,
|
||||||
'DEVX': 17,
|
'DEVX': 17,
|
||||||
'CANCEL': 18,
|
'CANCEL': 22,
|
||||||
'PDB': 500,
|
'PDB': 500,
|
||||||
}
|
}
|
||||||
STD_PALETTE = {
|
STD_PALETTE = {
|
||||||
|
@ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter):
|
||||||
Delegate a log call to the underlying logger, after adding
|
Delegate a log call to the underlying logger, after adding
|
||||||
contextual information from this adapter instance.
|
contextual information from this adapter instance.
|
||||||
|
|
||||||
|
NOTE: all custom level methods (above) delegate to this!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if self.isEnabledFor(level):
|
if self.isEnabledFor(level):
|
||||||
stacklevel: int = 3
|
stacklevel: int = 3
|
||||||
|
|
|
@ -41,8 +41,10 @@ import textwrap
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
|
Protocol,
|
||||||
Type,
|
Type,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
TypeVar,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
@ -181,7 +183,11 @@ def mk_dec(
|
||||||
dec_hook: Callable|None = None,
|
dec_hook: Callable|None = None,
|
||||||
|
|
||||||
) -> MsgDec:
|
) -> MsgDec:
|
||||||
|
'''
|
||||||
|
Create an IPC msg decoder, normally used as the
|
||||||
|
`PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`.
|
||||||
|
|
||||||
|
'''
|
||||||
return MsgDec(
|
return MsgDec(
|
||||||
_dec=msgpack.Decoder(
|
_dec=msgpack.Decoder(
|
||||||
type=spec, # like `MsgType[Any]`
|
type=spec, # like `MsgType[Any]`
|
||||||
|
@ -227,6 +233,13 @@ def pformat_msgspec(
|
||||||
join_char: str = '\n',
|
join_char: str = '\n',
|
||||||
|
|
||||||
) -> str:
|
) -> str:
|
||||||
|
'''
|
||||||
|
Pretty `str` format the `msgspec.msgpack.Decoder.type` attributed
|
||||||
|
for display in log messages as a nice (maybe multiline)
|
||||||
|
presentation of all the supported `Struct`s availed for typed
|
||||||
|
decoding.
|
||||||
|
|
||||||
|
'''
|
||||||
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
|
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
|
||||||
return join_char.join(
|
return join_char.join(
|
||||||
mk_msgspec_table(
|
mk_msgspec_table(
|
||||||
|
@ -630,31 +643,57 @@ def limit_msg_spec(
|
||||||
# # import pdbp; pdbp.set_trace()
|
# # import pdbp; pdbp.set_trace()
|
||||||
# assert ext_codec.pld_spec == extended_spec
|
# assert ext_codec.pld_spec == extended_spec
|
||||||
# yield ext_codec
|
# yield ext_codec
|
||||||
|
#
|
||||||
|
# ^-TODO-^ is it impossible to make something like this orr!?
|
||||||
|
|
||||||
|
# TODO: make an auto-custom hook generator from a set of input custom
|
||||||
|
# types?
|
||||||
|
# -[ ] below is a proto design using a `TypeCodec` idea?
|
||||||
|
#
|
||||||
|
# type var for the expected interchange-lib's
|
||||||
|
# IPC-transport type when not available as a built-in
|
||||||
|
# serialization output.
|
||||||
|
WireT = TypeVar('WireT')
|
||||||
|
|
||||||
|
|
||||||
# TODO: make something similar to this inside `._codec` such that
|
# TODO: some kinda (decorator) API for built-in subtypes
|
||||||
# user can just pass a type table of some sort?
|
# that builds this implicitly by inspecting the `mro()`?
|
||||||
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
|
class TypeCodec(Protocol):
|
||||||
# and then call `.to_dict()` on them?
|
'''
|
||||||
# -[x] we're going to need to re-impl all the stuff changed in the
|
A per-custom-type wire-transport serialization translator
|
||||||
# runtime port such that it can handle dicts or `Msg`s?
|
description type.
|
||||||
#
|
|
||||||
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
|
'''
|
||||||
# '''
|
src_type: Type
|
||||||
# Deliver a `enc_hook()`/`dec_hook()` pair which does
|
wire_type: WireT
|
||||||
# manual convertion from our above native `Msg` set
|
|
||||||
# to `dict` equivalent (wire msgs) in order to keep legacy compat
|
def encode(obj: Type) -> WireT:
|
||||||
# with the original runtime implementation.
|
...
|
||||||
#
|
|
||||||
# Note: this is is/was primarly used while moving the core
|
def decode(
|
||||||
# runtime over to using native `Msg`-struct types wherein we
|
obj_type: Type[WireT],
|
||||||
# start with the send side emitting without loading
|
obj: WireT,
|
||||||
# a typed-decoder and then later flipping the switch over to
|
) -> Type:
|
||||||
# load to the native struct types once all runtime usage has
|
...
|
||||||
# been adjusted appropriately.
|
|
||||||
#
|
|
||||||
# '''
|
class MsgpackTypeCodec(TypeCodec):
|
||||||
# return (
|
...
|
||||||
# # enc_to_dict,
|
|
||||||
# dec_from_dict,
|
|
||||||
# )
|
def mk_codec_hooks(
|
||||||
|
type_codecs: list[TypeCodec],
|
||||||
|
|
||||||
|
) -> tuple[Callable, Callable]:
|
||||||
|
'''
|
||||||
|
Deliver a `enc_hook()`/`dec_hook()` pair which handle
|
||||||
|
manual convertion from an input `Type` set such that whenever
|
||||||
|
the `TypeCodec.filter()` predicate matches the
|
||||||
|
`TypeCodec.decode()` is called on the input native object by
|
||||||
|
the `dec_hook()` and whenever the
|
||||||
|
`isiinstance(obj, TypeCodec.type)` matches against an
|
||||||
|
`enc_hook(obj=obj)` the return value is taken from a
|
||||||
|
`TypeCodec.encode(obj)` callback.
|
||||||
|
|
||||||
|
'''
|
||||||
|
...
|
||||||
|
|
|
@ -580,12 +580,15 @@ async def drain_to_final_msg(
|
||||||
# 2. WE DID NOT REQUEST that cancel and thus
|
# 2. WE DID NOT REQUEST that cancel and thus
|
||||||
# SHOULD RAISE HERE!
|
# SHOULD RAISE HERE!
|
||||||
except trio.Cancelled as taskc:
|
except trio.Cancelled as taskc:
|
||||||
|
# from tractor.devx._debug import pause
|
||||||
|
# await pause(shield=True)
|
||||||
|
|
||||||
# CASE 2: mask the local cancelled-error(s)
|
# CASE 2: mask the local cancelled-error(s)
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
# cancellation.
|
# cancellation.
|
||||||
ctx.maybe_raise(
|
ctx.maybe_raise(
|
||||||
|
hide_tb=hide_tb,
|
||||||
# TODO: when use this/
|
# TODO: when use this/
|
||||||
# from_src_exc=taskc,
|
# from_src_exc=taskc,
|
||||||
)
|
)
|
||||||
|
|
|
@ -34,6 +34,9 @@ from pprint import (
|
||||||
saferepr,
|
saferepr,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from tractor.log import get_logger
|
||||||
|
|
||||||
|
log = get_logger()
|
||||||
# TODO: auto-gen type sig for input func both for
|
# TODO: auto-gen type sig for input func both for
|
||||||
# type-msgs and logging of RPC tasks?
|
# type-msgs and logging of RPC tasks?
|
||||||
# taken and modified from:
|
# taken and modified from:
|
||||||
|
@ -143,7 +146,13 @@ def pformat(
|
||||||
|
|
||||||
else: # the `pprint` recursion-safe format:
|
else: # the `pprint` recursion-safe format:
|
||||||
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
||||||
val_str: str = saferepr(v)
|
try:
|
||||||
|
val_str: str = saferepr(v)
|
||||||
|
except Exception:
|
||||||
|
log.exception(
|
||||||
|
'Failed to `saferepr({type(struct)})` !?\n'
|
||||||
|
)
|
||||||
|
return _Struct.__repr__(struct)
|
||||||
|
|
||||||
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
|
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
|
||||||
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
||||||
|
@ -194,12 +203,20 @@ class Struct(
|
||||||
return sin_props
|
return sin_props
|
||||||
|
|
||||||
pformat = pformat
|
pformat = pformat
|
||||||
|
# __repr__ = pformat
|
||||||
# __str__ = __repr__ = pformat
|
# __str__ = __repr__ = pformat
|
||||||
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
||||||
# inside a known tty?
|
# inside a known tty?
|
||||||
# def __repr__(self) -> str:
|
# def __repr__(self) -> str:
|
||||||
# ...
|
# ...
|
||||||
__repr__ = pformat
|
def __repr__(self) -> str:
|
||||||
|
try:
|
||||||
|
return pformat(self)
|
||||||
|
except Exception:
|
||||||
|
log.exception(
|
||||||
|
f'Failed to `pformat({type(self)})` !?\n'
|
||||||
|
)
|
||||||
|
return _Struct.__repr__(self)
|
||||||
|
|
||||||
def copy(
|
def copy(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -156,11 +156,12 @@ class BroadcastState(Struct):
|
||||||
|
|
||||||
class BroadcastReceiver(ReceiveChannel):
|
class BroadcastReceiver(ReceiveChannel):
|
||||||
'''
|
'''
|
||||||
A memory receive channel broadcaster which is non-lossy for the
|
A memory receive channel broadcaster which is non-lossy for
|
||||||
fastest consumer.
|
the fastest consumer.
|
||||||
|
|
||||||
Additional consumer tasks can receive all produced values by registering
|
Additional consumer tasks can receive all produced values by
|
||||||
with ``.subscribe()`` and receiving from the new instance it delivers.
|
registering with ``.subscribe()`` and receiving from the new
|
||||||
|
instance it delivers.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
|
@ -18,8 +18,12 @@
|
||||||
Async context manager primitives with hard ``trio``-aware semantics
|
Async context manager primitives with hard ``trio``-aware semantics
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from __future__ import annotations
|
||||||
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
)
|
||||||
import inspect
|
import inspect
|
||||||
|
from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncContextManager,
|
AsyncContextManager,
|
||||||
|
@ -30,13 +34,16 @@ from typing import (
|
||||||
Optional,
|
Optional,
|
||||||
Sequence,
|
Sequence,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from tractor._state import current_actor
|
from tractor._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from tractor import ActorNursery
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -46,8 +53,10 @@ T = TypeVar("T")
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_nursery(
|
async def maybe_open_nursery(
|
||||||
nursery: trio.Nursery | None = None,
|
nursery: trio.Nursery|ActorNursery|None = None,
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
|
lib: ModuleType = trio,
|
||||||
|
|
||||||
) -> AsyncGenerator[trio.Nursery, Any]:
|
) -> AsyncGenerator[trio.Nursery, Any]:
|
||||||
'''
|
'''
|
||||||
Create a new nursery if None provided.
|
Create a new nursery if None provided.
|
||||||
|
@ -58,13 +67,12 @@ async def maybe_open_nursery(
|
||||||
if nursery is not None:
|
if nursery is not None:
|
||||||
yield nursery
|
yield nursery
|
||||||
else:
|
else:
|
||||||
async with trio.open_nursery() as nursery:
|
async with lib.open_nursery() as nursery:
|
||||||
nursery.cancel_scope.shield = shield
|
nursery.cancel_scope.shield = shield
|
||||||
yield nursery
|
yield nursery
|
||||||
|
|
||||||
|
|
||||||
async def _enter_and_wait(
|
async def _enter_and_wait(
|
||||||
|
|
||||||
mngr: AsyncContextManager[T],
|
mngr: AsyncContextManager[T],
|
||||||
unwrapped: dict[int, T],
|
unwrapped: dict[int, T],
|
||||||
all_entered: trio.Event,
|
all_entered: trio.Event,
|
||||||
|
@ -91,7 +99,6 @@ async def _enter_and_wait(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def gather_contexts(
|
async def gather_contexts(
|
||||||
|
|
||||||
mngrs: Sequence[AsyncContextManager[T]],
|
mngrs: Sequence[AsyncContextManager[T]],
|
||||||
|
|
||||||
) -> AsyncGenerator[
|
) -> AsyncGenerator[
|
||||||
|
@ -102,15 +109,17 @@ async def gather_contexts(
|
||||||
None,
|
None,
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Concurrently enter a sequence of async context managers, each in
|
Concurrently enter a sequence of async context managers (acms),
|
||||||
a separate ``trio`` task and deliver the unwrapped values in the
|
each from a separate `trio` task and deliver the unwrapped
|
||||||
same order once all managers have entered. On exit all contexts are
|
`yield`-ed values in the same order once all managers have entered.
|
||||||
subsequently and concurrently exited.
|
|
||||||
|
|
||||||
This function is somewhat similar to common usage of
|
On exit, all acms are subsequently and concurrently exited.
|
||||||
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
|
|
||||||
combo with ``asyncio.gather()`` except the managers are concurrently
|
This function is somewhat similar to a batch of non-blocking
|
||||||
entered and exited, and cancellation just works.
|
calls to `contextlib.AsyncExitStack.enter_async_context()`
|
||||||
|
(inside a loop) *in combo with* a `asyncio.gather()` to get the
|
||||||
|
`.__aenter__()`-ed values, except the managers are both
|
||||||
|
concurrently entered and exited and *cancellation just works*(R).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
seed: int = id(mngrs)
|
seed: int = id(mngrs)
|
||||||
|
@ -210,9 +219,10 @@ async def maybe_open_context(
|
||||||
|
|
||||||
) -> AsyncIterator[tuple[bool, T]]:
|
) -> AsyncIterator[tuple[bool, T]]:
|
||||||
'''
|
'''
|
||||||
Maybe open a context manager if there is not already a _Cached
|
Maybe open an async-context-manager (acm) if there is not already
|
||||||
version for the provided ``key`` for *this* actor. Return the
|
a `_Cached` version for the provided (input) `key` for *this* actor.
|
||||||
_Cached instance on a _Cache hit.
|
|
||||||
|
Return the `_Cached` instance on a _Cache hit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
fid = id(acm_func)
|
fid = id(acm_func)
|
||||||
|
@ -273,8 +283,13 @@ async def maybe_open_context(
|
||||||
else:
|
else:
|
||||||
_Cache.users += 1
|
_Cache.users += 1
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Reusing resource for `_Cache` user {_Cache.users}\n\n'
|
f'Re-using cached resource for user {_Cache.users}\n\n'
|
||||||
f'{ctx_key!r} -> {yielded!r}\n'
|
f'{ctx_key!r} -> {type(yielded)}\n'
|
||||||
|
|
||||||
|
# TODO: make this work with values but without
|
||||||
|
# `msgspec.Struct` causing frickin crashes on field-type
|
||||||
|
# lookups..
|
||||||
|
# f'{ctx_key!r} -> {yielded!r}\n'
|
||||||
)
|
)
|
||||||
lock.release()
|
lock.release()
|
||||||
yield True, yielded
|
yield True, yielded
|
||||||
|
|
Loading…
Reference in New Issue