Compare commits
11 Commits
2bb33da9c8
...
48b6db5c68
Author | SHA1 | Date |
---|---|---|
|
48b6db5c68 | |
|
029888cee8 | |
|
223d885e22 | |
|
a1f091882e | |
|
e587f0da23 | |
|
5138224625 | |
|
ad72cd629f | |
|
533e69baaf | |
|
fbc9325184 | |
|
3cd222959a | |
|
2ea703cc75 |
|
@ -6,21 +6,22 @@ from __future__ import annotations
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
import os
|
import os
|
||||||
import random
|
|
||||||
import signal
|
import signal
|
||||||
import platform
|
import platform
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import tractor
|
|
||||||
from tractor._testing import (
|
from tractor._testing import (
|
||||||
examples_dir as examples_dir,
|
examples_dir as examples_dir,
|
||||||
tractor_test as tractor_test,
|
tractor_test as tractor_test,
|
||||||
expect_ctxc as expect_ctxc,
|
expect_ctxc as expect_ctxc,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: include wtv plugin(s) we build in `._testing.pytest`?
|
pytest_plugins: list[str] = [
|
||||||
pytest_plugins = ['pytester']
|
'pytester',
|
||||||
|
'tractor._testing.pytest',
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
|
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
|
||||||
if platform.system() == 'Windows':
|
if platform.system() == 'Windows':
|
||||||
|
@ -48,6 +49,9 @@ no_windows = pytest.mark.skipif(
|
||||||
def pytest_addoption(
|
def pytest_addoption(
|
||||||
parser: pytest.Parser,
|
parser: pytest.Parser,
|
||||||
):
|
):
|
||||||
|
# ?TODO? should this be exposed from our `._testing.pytest`
|
||||||
|
# plugin or should we make it more explicit with `--tl` for
|
||||||
|
# tractor logging like we do in other client projects?
|
||||||
parser.addoption(
|
parser.addoption(
|
||||||
"--ll",
|
"--ll",
|
||||||
action="store",
|
action="store",
|
||||||
|
@ -55,54 +59,10 @@ def pytest_addoption(
|
||||||
default='ERROR', help="logging level to set when testing"
|
default='ERROR', help="logging level to set when testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.addoption(
|
|
||||||
"--spawn-backend",
|
|
||||||
action="store",
|
|
||||||
dest='spawn_backend',
|
|
||||||
default='trio',
|
|
||||||
help="Processing spawning backend to use for test run",
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.addoption(
|
|
||||||
"--tpdb",
|
|
||||||
"--debug-mode",
|
|
||||||
action="store_true",
|
|
||||||
dest='tractor_debug_mode',
|
|
||||||
# default=False,
|
|
||||||
help=(
|
|
||||||
'Enable a flag that can be used by tests to to set the '
|
|
||||||
'`debug_mode: bool` for engaging the internal '
|
|
||||||
'multi-proc debugger sys.'
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# provide which IPC transport protocols opting-in test suites
|
|
||||||
# should accumulatively run against.
|
|
||||||
parser.addoption(
|
|
||||||
"--tpt-proto",
|
|
||||||
nargs='+', # accumulate-multiple-args
|
|
||||||
action="store",
|
|
||||||
dest='tpt_protos',
|
|
||||||
default=['tcp'],
|
|
||||||
help="Transport protocol to use under the `tractor.ipc.Channel`",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def pytest_configure(config):
|
|
||||||
backend = config.option.spawn_backend
|
|
||||||
tractor._spawn.try_set_start_method(backend)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
|
||||||
def debug_mode(request) -> bool:
|
|
||||||
debug_mode: bool = request.config.option.tractor_debug_mode
|
|
||||||
# if debug_mode:
|
|
||||||
# breakpoint()
|
|
||||||
return debug_mode
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session', autouse=True)
|
@pytest.fixture(scope='session', autouse=True)
|
||||||
def loglevel(request):
|
def loglevel(request):
|
||||||
|
import tractor
|
||||||
orig = tractor.log._default_loglevel
|
orig = tractor.log._default_loglevel
|
||||||
level = tractor.log._default_loglevel = request.config.option.loglevel
|
level = tractor.log._default_loglevel = request.config.option.loglevel
|
||||||
tractor.log.get_console_log(level)
|
tractor.log.get_console_log(level)
|
||||||
|
@ -110,49 +70,6 @@ def loglevel(request):
|
||||||
tractor.log._default_loglevel = orig
|
tractor.log._default_loglevel = orig
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
|
||||||
def spawn_backend(request) -> str:
|
|
||||||
return request.config.option.spawn_backend
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
|
||||||
def tpt_protos(request) -> list[str]:
|
|
||||||
|
|
||||||
# allow quoting on CLI
|
|
||||||
proto_keys: list[str] = [
|
|
||||||
proto_key.replace('"', '').replace("'", "")
|
|
||||||
for proto_key in request.config.option.tpt_protos
|
|
||||||
]
|
|
||||||
|
|
||||||
# ?TODO, eventually support multiple protos per test-sesh?
|
|
||||||
if len(proto_keys) > 1:
|
|
||||||
pytest.fail(
|
|
||||||
'We only support one `--tpt-proto <key>` atm!\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX ensure we support the protocol by name via lookup!
|
|
||||||
for proto_key in proto_keys:
|
|
||||||
addr_type = tractor._addr._address_types[proto_key]
|
|
||||||
assert addr_type.proto_key == proto_key
|
|
||||||
|
|
||||||
yield proto_keys
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(
|
|
||||||
scope='session',
|
|
||||||
autouse=True,
|
|
||||||
)
|
|
||||||
def tpt_proto(
|
|
||||||
tpt_protos: list[str],
|
|
||||||
) -> str:
|
|
||||||
proto_key: str = tpt_protos[0]
|
|
||||||
from tractor import _state
|
|
||||||
if _state._def_tpt_proto != proto_key:
|
|
||||||
_state._def_tpt_proto = proto_key
|
|
||||||
# breakpoint()
|
|
||||||
yield proto_key
|
|
||||||
|
|
||||||
|
|
||||||
_ci_env: bool = os.environ.get('CI', False)
|
_ci_env: bool = os.environ.get('CI', False)
|
||||||
|
|
||||||
|
|
||||||
|
@ -165,80 +82,6 @@ def ci_env() -> bool:
|
||||||
return _ci_env
|
return _ci_env
|
||||||
|
|
||||||
|
|
||||||
# TODO: also move this to `._testing` for now?
|
|
||||||
# -[ ] possibly generalize and re-use for multi-tree spawning
|
|
||||||
# along with the new stuff for multi-addrs?
|
|
||||||
#
|
|
||||||
# choose random port at import time
|
|
||||||
_rando_port: str = random.randint(1000, 9999)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
|
||||||
def reg_addr(
|
|
||||||
tpt_proto: str,
|
|
||||||
) -> tuple[str, int|str]:
|
|
||||||
|
|
||||||
# globally override the runtime to the per-test-session-dynamic
|
|
||||||
# addr so that all tests never conflict with any other actor
|
|
||||||
# tree using the default.
|
|
||||||
from tractor import (
|
|
||||||
_addr,
|
|
||||||
)
|
|
||||||
addr_type = _addr._address_types[tpt_proto]
|
|
||||||
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
|
|
||||||
|
|
||||||
testrun_reg_addr: tuple[str, int]
|
|
||||||
match tpt_proto:
|
|
||||||
case 'tcp':
|
|
||||||
testrun_reg_addr = (
|
|
||||||
addr_type.def_bindspace,
|
|
||||||
_rando_port,
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE, file-name uniqueness (no-collisions) will be based on
|
|
||||||
# the runtime-directory and root (pytest-proc's) pid.
|
|
||||||
case 'uds':
|
|
||||||
testrun_reg_addr = addr_type.get_random().unwrap()
|
|
||||||
|
|
||||||
assert def_reg_addr != testrun_reg_addr
|
|
||||||
return testrun_reg_addr
|
|
||||||
|
|
||||||
|
|
||||||
def pytest_generate_tests(metafunc):
|
|
||||||
spawn_backend: str = metafunc.config.option.spawn_backend
|
|
||||||
|
|
||||||
if not spawn_backend:
|
|
||||||
# XXX some weird windows bug with `pytest`?
|
|
||||||
spawn_backend = 'trio'
|
|
||||||
|
|
||||||
# TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
|
|
||||||
assert spawn_backend in (
|
|
||||||
'mp_spawn',
|
|
||||||
'mp_forkserver',
|
|
||||||
'trio',
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
|
|
||||||
# you just passed --spawn-backend=`mp` on the cli, but now we expect
|
|
||||||
# that cli input to be manually specified, BUT, maybe we'll do
|
|
||||||
# something like this again in the future?
|
|
||||||
if 'start_method' in metafunc.fixturenames:
|
|
||||||
metafunc.parametrize(
|
|
||||||
"start_method",
|
|
||||||
[spawn_backend],
|
|
||||||
scope='module',
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO, parametrize any `tpt_proto: str` declaring tests!
|
|
||||||
# proto_tpts: list[str] = metafunc.config.option.proto_tpts
|
|
||||||
# if 'tpt_proto' in metafunc.fixturenames:
|
|
||||||
# metafunc.parametrize(
|
|
||||||
# 'tpt_proto',
|
|
||||||
# proto_tpts, # TODO, double check this list usage!
|
|
||||||
# scope='module',
|
|
||||||
# )
|
|
||||||
|
|
||||||
|
|
||||||
def sig_prog(
|
def sig_prog(
|
||||||
proc: subprocess.Popen,
|
proc: subprocess.Popen,
|
||||||
sig: int,
|
sig: int,
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
'''
|
||||||
|
`tractor.ipc` subsystem(s)/unit testing suites.
|
||||||
|
|
||||||
|
'''
|
|
@ -0,0 +1,72 @@
|
||||||
|
'''
|
||||||
|
High-level `.ipc._server` unit tests.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
from tractor import (
|
||||||
|
devx,
|
||||||
|
ipc,
|
||||||
|
log,
|
||||||
|
)
|
||||||
|
from tractor._testing.addr import (
|
||||||
|
get_rando_addr,
|
||||||
|
)
|
||||||
|
# TODO, use/check-roundtripping with some of these wrapper types?
|
||||||
|
#
|
||||||
|
# from .._addr import Address
|
||||||
|
# from ._chan import Channel
|
||||||
|
# from ._transport import MsgTransport
|
||||||
|
# from ._uds import UDSAddress
|
||||||
|
# from ._tcp import TCPAddress
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'_tpt_proto',
|
||||||
|
['uds', 'tcp']
|
||||||
|
)
|
||||||
|
def test_basic_ipc_server(
|
||||||
|
_tpt_proto: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
|
||||||
|
# so we see the socket-listener reporting on console
|
||||||
|
log.get_console_log("INFO")
|
||||||
|
|
||||||
|
rando_addr: tuple = get_rando_addr(
|
||||||
|
tpt_proto=_tpt_proto,
|
||||||
|
)
|
||||||
|
async def main():
|
||||||
|
async with ipc._server.open_ipc_server() as server:
|
||||||
|
|
||||||
|
assert (
|
||||||
|
server._parent_tn
|
||||||
|
and
|
||||||
|
server._parent_tn is server._stream_handler_tn
|
||||||
|
)
|
||||||
|
assert server._no_more_peers.is_set()
|
||||||
|
|
||||||
|
eps: list[ipc.IPCEndpoint] = await server.listen_on(
|
||||||
|
accept_addrs=[rando_addr],
|
||||||
|
stream_handler_nursery=None,
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
len(eps) == 1
|
||||||
|
and
|
||||||
|
(ep := eps[0])._listener
|
||||||
|
and
|
||||||
|
not ep.peer_tpts
|
||||||
|
)
|
||||||
|
|
||||||
|
server._parent_tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
# !TODO! actually make a bg-task connection from a client
|
||||||
|
# using `ipc._chan._connect_chan()`
|
||||||
|
|
||||||
|
with devx.maybe_open_crash_handler(
|
||||||
|
pdb=debug_mode,
|
||||||
|
):
|
||||||
|
trio.run(main)
|
|
@ -83,3 +83,26 @@ def test_implicit_root_via_first_nursery(
|
||||||
assert tractor.current_actor().aid.name == 'root'
|
assert tractor.current_actor().aid.name == 'root'
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
def test_runtime_vars_unset(
|
||||||
|
reg_addr: tuple,
|
||||||
|
debug_mode: bool
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Ensure any `._state._runtime_vars` are restored to default values
|
||||||
|
after the root actor-runtime exits!
|
||||||
|
|
||||||
|
'''
|
||||||
|
assert not tractor._state._runtime_vars['_debug_mode']
|
||||||
|
async def main():
|
||||||
|
assert not tractor._state._runtime_vars['_debug_mode']
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=True,
|
||||||
|
):
|
||||||
|
assert tractor._state._runtime_vars['_debug_mode']
|
||||||
|
|
||||||
|
# after runtime closure, should be reverted!
|
||||||
|
assert not tractor._state._runtime_vars['_debug_mode']
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
|
@ -22,7 +22,6 @@ from __future__ import annotations
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
import os
|
import os
|
||||||
import textwrap
|
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -35,7 +34,10 @@ from .log import (
|
||||||
get_logger,
|
get_logger,
|
||||||
)
|
)
|
||||||
from . import _state
|
from . import _state
|
||||||
from .devx import _debug
|
from .devx import (
|
||||||
|
_debug,
|
||||||
|
pformat,
|
||||||
|
)
|
||||||
from .to_asyncio import run_as_asyncio_guest
|
from .to_asyncio import run_as_asyncio_guest
|
||||||
from ._addr import UnwrappedAddress
|
from ._addr import UnwrappedAddress
|
||||||
from ._runtime import (
|
from ._runtime import (
|
||||||
|
@ -103,107 +105,6 @@ def _mp_main(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually
|
|
||||||
# as we work out our multi-domain state-flow-syntax!
|
|
||||||
def nest_from_op(
|
|
||||||
input_op: str,
|
|
||||||
#
|
|
||||||
# ?TODO? an idea for a syntax to the state of concurrent systems
|
|
||||||
# as a "3-domain" (execution, scope, storage) model and using
|
|
||||||
# a minimal ascii/utf-8 operator-set.
|
|
||||||
#
|
|
||||||
# try not to take any of this seriously yet XD
|
|
||||||
#
|
|
||||||
# > is a "play operator" indicating (CPU bound)
|
|
||||||
# exec/work/ops required at the "lowest level computing"
|
|
||||||
#
|
|
||||||
# execution primititves (tasks, threads, actors..) denote their
|
|
||||||
# lifetime with '(' and ')' since parentheses normally are used
|
|
||||||
# in many langs to denote function calls.
|
|
||||||
#
|
|
||||||
# starting = (
|
|
||||||
# >( opening/starting; beginning of the thread-of-exec (toe?)
|
|
||||||
# (> opened/started, (finished spawning toe)
|
|
||||||
# |_<Task: blah blah..> repr of toe, in py these look like <objs>
|
|
||||||
#
|
|
||||||
# >) closing/exiting/stopping,
|
|
||||||
# )> closed/exited/stopped,
|
|
||||||
# |_<Task: blah blah..>
|
|
||||||
# [OR <), )< ?? ]
|
|
||||||
#
|
|
||||||
# ending = )
|
|
||||||
# >c) cancelling to close/exit
|
|
||||||
# c)> cancelled (caused close), OR?
|
|
||||||
# |_<Actor: ..>
|
|
||||||
# OR maybe "<c)" which better indicates the cancel being
|
|
||||||
# "delivered/returned" / returned" to LHS?
|
|
||||||
#
|
|
||||||
# >x) erroring to eventuall exit
|
|
||||||
# x)> errored and terminated
|
|
||||||
# |_<Actor: ...>
|
|
||||||
#
|
|
||||||
# scopes: supers/nurseries, IPC-ctxs, sessions, perms, etc.
|
|
||||||
# >{ opening
|
|
||||||
# {> opened
|
|
||||||
# }> closed
|
|
||||||
# >} closing
|
|
||||||
#
|
|
||||||
# storage: like queues, shm-buffers, files, etc..
|
|
||||||
# >[ opening
|
|
||||||
# [> opened
|
|
||||||
# |_<FileObj: ..>
|
|
||||||
#
|
|
||||||
# >] closing
|
|
||||||
# ]> closed
|
|
||||||
|
|
||||||
# IPC ops: channels, transports, msging
|
|
||||||
# => req msg
|
|
||||||
# <= resp msg
|
|
||||||
# <=> 2-way streaming (of msgs)
|
|
||||||
# <- recv 1 msg
|
|
||||||
# -> send 1 msg
|
|
||||||
#
|
|
||||||
# TODO: still not sure on R/L-HS approach..?
|
|
||||||
# =>( send-req to exec start (task, actor, thread..)
|
|
||||||
# (<= recv-req to ^
|
|
||||||
#
|
|
||||||
# (<= recv-req ^
|
|
||||||
# <=( recv-resp opened remote exec primitive
|
|
||||||
# <=) recv-resp closed
|
|
||||||
#
|
|
||||||
# )<=c req to stop due to cancel
|
|
||||||
# c=>) req to stop due to cancel
|
|
||||||
#
|
|
||||||
# =>{ recv-req to open
|
|
||||||
# <={ send-status that it closed
|
|
||||||
|
|
||||||
tree_str: str,
|
|
||||||
|
|
||||||
# NOTE: so move back-from-the-left of the `input_op` by
|
|
||||||
# this amount.
|
|
||||||
back_from_op: int = 0,
|
|
||||||
) -> str:
|
|
||||||
'''
|
|
||||||
Depth-increment the input (presumably hierarchy/supervision)
|
|
||||||
input "tree string" below the provided `input_op` execution
|
|
||||||
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
|
|
||||||
`tree_str` to nest content aligned with the ops last char.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return (
|
|
||||||
f'{input_op}\n'
|
|
||||||
+
|
|
||||||
textwrap.indent(
|
|
||||||
tree_str,
|
|
||||||
prefix=(
|
|
||||||
len(input_op)
|
|
||||||
-
|
|
||||||
(back_from_op + 1)
|
|
||||||
) * ' ',
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _trio_main(
|
def _trio_main(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
*,
|
*,
|
||||||
|
@ -236,7 +137,7 @@ def _trio_main(
|
||||||
log.info(
|
log.info(
|
||||||
'Starting new `trio` subactor:\n'
|
'Starting new `trio` subactor:\n'
|
||||||
+
|
+
|
||||||
nest_from_op(
|
pformat.nest_from_op(
|
||||||
input_op='>(', # see syntax ideas above
|
input_op='>(', # see syntax ideas above
|
||||||
tree_str=actor_info,
|
tree_str=actor_info,
|
||||||
back_from_op=2, # since "complete"
|
back_from_op=2, # since "complete"
|
||||||
|
@ -246,7 +147,7 @@ def _trio_main(
|
||||||
exit_status: str = (
|
exit_status: str = (
|
||||||
'Subactor exited\n'
|
'Subactor exited\n'
|
||||||
+
|
+
|
||||||
nest_from_op(
|
pformat.nest_from_op(
|
||||||
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
||||||
tree_str=actor_info,
|
tree_str=actor_info,
|
||||||
back_from_op=1,
|
back_from_op=1,
|
||||||
|
@ -264,7 +165,7 @@ def _trio_main(
|
||||||
exit_status: str = (
|
exit_status: str = (
|
||||||
'Actor received KBI (aka an OS-cancel)\n'
|
'Actor received KBI (aka an OS-cancel)\n'
|
||||||
+
|
+
|
||||||
nest_from_op(
|
pformat.nest_from_op(
|
||||||
input_op='c)>', # closed due to cancel (see above)
|
input_op='c)>', # closed due to cancel (see above)
|
||||||
tree_str=actor_info,
|
tree_str=actor_info,
|
||||||
)
|
)
|
||||||
|
@ -274,7 +175,7 @@ def _trio_main(
|
||||||
exit_status: str = (
|
exit_status: str = (
|
||||||
'Main actor task exited due to crash?\n'
|
'Main actor task exited due to crash?\n'
|
||||||
+
|
+
|
||||||
nest_from_op(
|
pformat.nest_from_op(
|
||||||
input_op='x)>', # closed by error
|
input_op='x)>', # closed by error
|
||||||
tree_str=actor_info,
|
tree_str=actor_info,
|
||||||
)
|
)
|
||||||
|
|
|
@ -582,8 +582,7 @@ async def open_portal(
|
||||||
msg_loop_cs = await tn.start(
|
msg_loop_cs = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
_rpc.process_messages,
|
_rpc.process_messages,
|
||||||
actor,
|
chan=channel,
|
||||||
channel,
|
|
||||||
# if the local task is cancelled we want to keep
|
# if the local task is cancelled we want to keep
|
||||||
# the msg loop running until our block ends
|
# the msg loop running until our block ends
|
||||||
shield=True,
|
shield=True,
|
||||||
|
|
|
@ -276,7 +276,8 @@ async def open_root_actor(
|
||||||
|
|
||||||
if (
|
if (
|
||||||
debug_mode
|
debug_mode
|
||||||
and _spawn._spawn_method == 'trio'
|
and
|
||||||
|
_spawn._spawn_method == 'trio'
|
||||||
):
|
):
|
||||||
_state._runtime_vars['_debug_mode'] = True
|
_state._runtime_vars['_debug_mode'] = True
|
||||||
|
|
||||||
|
@ -512,8 +513,17 @@ async def open_root_actor(
|
||||||
)
|
)
|
||||||
await actor.cancel(None) # self cancel
|
await actor.cancel(None) # self cancel
|
||||||
finally:
|
finally:
|
||||||
|
# revert all process-global runtime state
|
||||||
|
if (
|
||||||
|
debug_mode
|
||||||
|
and
|
||||||
|
_spawn._spawn_method == 'trio'
|
||||||
|
):
|
||||||
|
_state._runtime_vars['_debug_mode'] = False
|
||||||
|
|
||||||
_state._current_actor = None
|
_state._current_actor = None
|
||||||
_state._last_actor_terminated = actor
|
_state._last_actor_terminated = actor
|
||||||
|
|
||||||
logger.runtime(
|
logger.runtime(
|
||||||
f'Root actor terminated\n'
|
f'Root actor terminated\n'
|
||||||
f')>\n'
|
f')>\n'
|
||||||
|
|
|
@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
|
||||||
|
|
||||||
|
|
||||||
async def process_messages(
|
async def process_messages(
|
||||||
actor: Actor,
|
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
@ -907,6 +906,7 @@ async def process_messages(
|
||||||
(as utilized inside `Portal.cancel_actor()` ).
|
(as utilized inside `Portal.cancel_actor()` ).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
actor: Actor = _state.current_actor()
|
||||||
assert actor._service_n # runtime state sanity
|
assert actor._service_n # runtime state sanity
|
||||||
|
|
||||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||||
|
|
|
@ -1262,6 +1262,10 @@ async def async_main(
|
||||||
the actor's "runtime" and all thus all ongoing RPC tasks.
|
the actor's "runtime" and all thus all ongoing RPC tasks.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# XXX NOTE, `_state._current_actor` **must** be set prior to
|
||||||
|
# calling this core runtime entrypoint!
|
||||||
|
assert actor is _state.current_actor()
|
||||||
|
|
||||||
actor._task: trio.Task = trio.lowlevel.current_task()
|
actor._task: trio.Task = trio.lowlevel.current_task()
|
||||||
|
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
|
@ -1321,7 +1325,6 @@ async def async_main(
|
||||||
) as service_nursery,
|
) as service_nursery,
|
||||||
|
|
||||||
_server.open_ipc_server(
|
_server.open_ipc_server(
|
||||||
actor=actor,
|
|
||||||
parent_tn=service_nursery,
|
parent_tn=service_nursery,
|
||||||
stream_handler_tn=service_nursery,
|
stream_handler_tn=service_nursery,
|
||||||
) as ipc_server,
|
) as ipc_server,
|
||||||
|
@ -1375,7 +1378,6 @@ async def async_main(
|
||||||
'Booting IPC server'
|
'Booting IPC server'
|
||||||
)
|
)
|
||||||
eps: list = await ipc_server.listen_on(
|
eps: list = await ipc_server.listen_on(
|
||||||
actor=actor,
|
|
||||||
accept_addrs=accept_addrs,
|
accept_addrs=accept_addrs,
|
||||||
stream_handler_nursery=service_nursery,
|
stream_handler_nursery=service_nursery,
|
||||||
)
|
)
|
||||||
|
@ -1460,8 +1462,7 @@ async def async_main(
|
||||||
await root_nursery.start(
|
await root_nursery.start(
|
||||||
partial(
|
partial(
|
||||||
_rpc.process_messages,
|
_rpc.process_messages,
|
||||||
actor,
|
chan=actor._parent_chan,
|
||||||
actor._parent_chan,
|
|
||||||
shield=True,
|
shield=True,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -37,6 +37,9 @@ from .fault_simulation import (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO, use dulwhich for this instead?
|
||||||
|
# -> we're going to likely need it (or something similar)
|
||||||
|
# for supporting hot-coad reload feats eventually anyway!
|
||||||
def repodir() -> pathlib.Path:
|
def repodir() -> pathlib.Path:
|
||||||
'''
|
'''
|
||||||
Return the abspath to the repo directory.
|
Return the abspath to the repo directory.
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Random IPC addr generation for isolating
|
||||||
|
the discovery space between test sessions.
|
||||||
|
|
||||||
|
Might be eventually useful to expose as a util set from
|
||||||
|
our `tractor.discovery` subsys?
|
||||||
|
|
||||||
|
'''
|
||||||
|
import random
|
||||||
|
from typing import (
|
||||||
|
Type,
|
||||||
|
)
|
||||||
|
from tractor import (
|
||||||
|
_addr,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_rando_addr(
|
||||||
|
tpt_proto: str,
|
||||||
|
*,
|
||||||
|
|
||||||
|
# choose random port at import time
|
||||||
|
_rando_port: str = random.randint(1000, 9999)
|
||||||
|
|
||||||
|
) -> tuple[str, str|int]:
|
||||||
|
'''
|
||||||
|
Used to globally override the runtime to the
|
||||||
|
per-test-session-dynamic addr so that all tests never conflict
|
||||||
|
with any other actor tree using the default.
|
||||||
|
|
||||||
|
'''
|
||||||
|
addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto]
|
||||||
|
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
|
||||||
|
|
||||||
|
# this is the "unwrapped" form expected to be passed to
|
||||||
|
# `.open_root_actor()` by test body.
|
||||||
|
testrun_reg_addr: tuple[str, int|str]
|
||||||
|
match tpt_proto:
|
||||||
|
case 'tcp':
|
||||||
|
testrun_reg_addr = (
|
||||||
|
addr_type.def_bindspace,
|
||||||
|
_rando_port,
|
||||||
|
)
|
||||||
|
|
||||||
|
# NOTE, file-name uniqueness (no-collisions) will be based on
|
||||||
|
# the runtime-directory and root (pytest-proc's) pid.
|
||||||
|
case 'uds':
|
||||||
|
testrun_reg_addr = addr_type.get_random().unwrap()
|
||||||
|
|
||||||
|
# XXX, as sanity it should never the same as the default for the
|
||||||
|
# host-singleton registry actor.
|
||||||
|
assert def_reg_addr != testrun_reg_addr
|
||||||
|
|
||||||
|
return testrun_reg_addr
|
|
@ -26,29 +26,46 @@ from functools import (
|
||||||
import inspect
|
import inspect
|
||||||
import platform
|
import platform
|
||||||
|
|
||||||
|
import pytest
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
||||||
def tractor_test(fn):
|
def tractor_test(fn):
|
||||||
'''
|
'''
|
||||||
Decorator for async test funcs to present them as "native"
|
Decorator for async test fns to decorator-wrap them as "native"
|
||||||
looking sync funcs runnable by `pytest` using `trio.run()`.
|
looking sync funcs runnable by `pytest` and auto invoked with
|
||||||
|
`trio.run()` (much like the `pytest-trio` plugin's approach).
|
||||||
|
|
||||||
Use:
|
Further the test fn body will be invoked AFTER booting the actor
|
||||||
|
runtime, i.e. from inside a `tractor.open_root_actor()` block AND
|
||||||
|
with various runtime and tooling parameters implicitly passed as
|
||||||
|
requested by by the test session's config; see immediately below.
|
||||||
|
|
||||||
@tractor_test
|
Basic deco use:
|
||||||
async def test_whatever():
|
---------------
|
||||||
await ...
|
|
||||||
|
|
||||||
If fixtures:
|
@tractor_test
|
||||||
|
async def test_whatever():
|
||||||
|
await ...
|
||||||
|
|
||||||
- ``reg_addr`` (a socket addr tuple where arbiter is listening)
|
|
||||||
- ``loglevel`` (logging level passed to tractor internals)
|
|
||||||
- ``start_method`` (subprocess spawning backend)
|
|
||||||
|
|
||||||
are defined in the `pytest` fixture space they will be automatically
|
Runtime config via special fixtures:
|
||||||
injected to tests declaring these funcargs.
|
------------------------------------
|
||||||
|
If any of the following fixture are requested by the wrapped test
|
||||||
|
fn (via normal func-args declaration),
|
||||||
|
|
||||||
|
- `reg_addr` (a socket addr tuple where arbiter is listening)
|
||||||
|
- `loglevel` (logging level passed to tractor internals)
|
||||||
|
- `start_method` (subprocess spawning backend)
|
||||||
|
|
||||||
|
(TODO support)
|
||||||
|
- `tpt_proto` (IPC transport protocol key)
|
||||||
|
|
||||||
|
they will be automatically injected to each test as normally
|
||||||
|
expected as well as passed to the initial
|
||||||
|
`tractor.open_root_actor()` funcargs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
@wraps(fn)
|
@wraps(fn)
|
||||||
def wrapper(
|
def wrapper(
|
||||||
|
@ -111,3 +128,164 @@ def tractor_test(fn):
|
||||||
return trio.run(main)
|
return trio.run(main)
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def pytest_addoption(
|
||||||
|
parser: pytest.Parser,
|
||||||
|
):
|
||||||
|
# parser.addoption(
|
||||||
|
# "--ll",
|
||||||
|
# action="store",
|
||||||
|
# dest='loglevel',
|
||||||
|
# default='ERROR', help="logging level to set when testing"
|
||||||
|
# )
|
||||||
|
|
||||||
|
parser.addoption(
|
||||||
|
"--spawn-backend",
|
||||||
|
action="store",
|
||||||
|
dest='spawn_backend',
|
||||||
|
default='trio',
|
||||||
|
help="Processing spawning backend to use for test run",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.addoption(
|
||||||
|
"--tpdb",
|
||||||
|
"--debug-mode",
|
||||||
|
action="store_true",
|
||||||
|
dest='tractor_debug_mode',
|
||||||
|
# default=False,
|
||||||
|
help=(
|
||||||
|
'Enable a flag that can be used by tests to to set the '
|
||||||
|
'`debug_mode: bool` for engaging the internal '
|
||||||
|
'multi-proc debugger sys.'
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# provide which IPC transport protocols opting-in test suites
|
||||||
|
# should accumulatively run against.
|
||||||
|
parser.addoption(
|
||||||
|
"--tpt-proto",
|
||||||
|
nargs='+', # accumulate-multiple-args
|
||||||
|
action="store",
|
||||||
|
dest='tpt_protos',
|
||||||
|
default=['tcp'],
|
||||||
|
help="Transport protocol to use under the `tractor.ipc.Channel`",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def pytest_configure(config):
|
||||||
|
backend = config.option.spawn_backend
|
||||||
|
tractor._spawn.try_set_start_method(backend)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='session')
|
||||||
|
def debug_mode(request) -> bool:
|
||||||
|
'''
|
||||||
|
Flag state for whether `--tpdb` (for `tractor`-py-debugger)
|
||||||
|
was passed to the test run.
|
||||||
|
|
||||||
|
Normally tests should pass this directly to `.open_root_actor()`
|
||||||
|
to allow the user to opt into suite-wide crash handling.
|
||||||
|
|
||||||
|
'''
|
||||||
|
debug_mode: bool = request.config.option.tractor_debug_mode
|
||||||
|
return debug_mode
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='session')
|
||||||
|
def spawn_backend(request) -> str:
|
||||||
|
return request.config.option.spawn_backend
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='session')
|
||||||
|
def tpt_protos(request) -> list[str]:
|
||||||
|
|
||||||
|
# allow quoting on CLI
|
||||||
|
proto_keys: list[str] = [
|
||||||
|
proto_key.replace('"', '').replace("'", "")
|
||||||
|
for proto_key in request.config.option.tpt_protos
|
||||||
|
]
|
||||||
|
|
||||||
|
# ?TODO, eventually support multiple protos per test-sesh?
|
||||||
|
if len(proto_keys) > 1:
|
||||||
|
pytest.fail(
|
||||||
|
'We only support one `--tpt-proto <key>` atm!\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX ensure we support the protocol by name via lookup!
|
||||||
|
for proto_key in proto_keys:
|
||||||
|
addr_type = tractor._addr._address_types[proto_key]
|
||||||
|
assert addr_type.proto_key == proto_key
|
||||||
|
|
||||||
|
yield proto_keys
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
scope='session',
|
||||||
|
autouse=True,
|
||||||
|
)
|
||||||
|
def tpt_proto(
|
||||||
|
tpt_protos: list[str],
|
||||||
|
) -> str:
|
||||||
|
proto_key: str = tpt_protos[0]
|
||||||
|
|
||||||
|
from tractor import _state
|
||||||
|
if _state._def_tpt_proto != proto_key:
|
||||||
|
_state._def_tpt_proto = proto_key
|
||||||
|
|
||||||
|
yield proto_key
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='session')
|
||||||
|
def reg_addr(
|
||||||
|
tpt_proto: str,
|
||||||
|
) -> tuple[str, int|str]:
|
||||||
|
'''
|
||||||
|
Deliver a test-sesh unique registry address such
|
||||||
|
that each run's (tests which use this fixture) will
|
||||||
|
have no conflicts/cross-talk when running simultaneously
|
||||||
|
nor will interfere with other live `tractor` apps active
|
||||||
|
on the same network-host (namespace).
|
||||||
|
|
||||||
|
'''
|
||||||
|
from tractor._testing.addr import get_rando_addr
|
||||||
|
return get_rando_addr(
|
||||||
|
tpt_proto=tpt_proto,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def pytest_generate_tests(
|
||||||
|
metafunc: pytest.Metafunc,
|
||||||
|
):
|
||||||
|
spawn_backend: str = metafunc.config.option.spawn_backend
|
||||||
|
|
||||||
|
if not spawn_backend:
|
||||||
|
# XXX some weird windows bug with `pytest`?
|
||||||
|
spawn_backend = 'trio'
|
||||||
|
|
||||||
|
# TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
|
||||||
|
assert spawn_backend in (
|
||||||
|
'mp_spawn',
|
||||||
|
'mp_forkserver',
|
||||||
|
'trio',
|
||||||
|
)
|
||||||
|
|
||||||
|
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
|
||||||
|
# you just passed --spawn-backend=`mp` on the cli, but now we expect
|
||||||
|
# that cli input to be manually specified, BUT, maybe we'll do
|
||||||
|
# something like this again in the future?
|
||||||
|
if 'start_method' in metafunc.fixturenames:
|
||||||
|
metafunc.parametrize(
|
||||||
|
"start_method",
|
||||||
|
[spawn_backend],
|
||||||
|
scope='module',
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO, parametrize any `tpt_proto: str` declaring tests!
|
||||||
|
# proto_tpts: list[str] = metafunc.config.option.proto_tpts
|
||||||
|
# if 'tpt_proto' in metafunc.fixturenames:
|
||||||
|
# metafunc.parametrize(
|
||||||
|
# 'tpt_proto',
|
||||||
|
# proto_tpts, # TODO, double check this list usage!
|
||||||
|
# scope='module',
|
||||||
|
# )
|
||||||
|
|
|
@ -3280,7 +3280,7 @@ def open_crash_handler(
|
||||||
|
|
||||||
@cm
|
@cm
|
||||||
def maybe_open_crash_handler(
|
def maybe_open_crash_handler(
|
||||||
pdb: bool = False,
|
pdb: bool|None = None,
|
||||||
tb_hide: bool = True,
|
tb_hide: bool = True,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -3291,7 +3291,11 @@ def maybe_open_crash_handler(
|
||||||
|
|
||||||
Normally this is used with CLI endpoints such that if the --pdb
|
Normally this is used with CLI endpoints such that if the --pdb
|
||||||
flag is passed the pdb REPL is engaed on any crashes B)
|
flag is passed the pdb REPL is engaed on any crashes B)
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
if pdb is None:
|
||||||
|
pdb: bool = _state.is_debug_mode()
|
||||||
|
|
||||||
__tracebackhide__: bool = tb_hide
|
__tracebackhide__: bool = tb_hide
|
||||||
|
|
||||||
rtctx = nullcontext(
|
rtctx = nullcontext(
|
||||||
|
|
|
@ -247,3 +247,104 @@ def pformat_cs(
|
||||||
+
|
+
|
||||||
fields
|
fields
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: move this func to some kinda `.devx.pformat.py` eventually
|
||||||
|
# as we work out our multi-domain state-flow-syntax!
|
||||||
|
def nest_from_op(
|
||||||
|
input_op: str,
|
||||||
|
#
|
||||||
|
# ?TODO? an idea for a syntax to the state of concurrent systems
|
||||||
|
# as a "3-domain" (execution, scope, storage) model and using
|
||||||
|
# a minimal ascii/utf-8 operator-set.
|
||||||
|
#
|
||||||
|
# try not to take any of this seriously yet XD
|
||||||
|
#
|
||||||
|
# > is a "play operator" indicating (CPU bound)
|
||||||
|
# exec/work/ops required at the "lowest level computing"
|
||||||
|
#
|
||||||
|
# execution primititves (tasks, threads, actors..) denote their
|
||||||
|
# lifetime with '(' and ')' since parentheses normally are used
|
||||||
|
# in many langs to denote function calls.
|
||||||
|
#
|
||||||
|
# starting = (
|
||||||
|
# >( opening/starting; beginning of the thread-of-exec (toe?)
|
||||||
|
# (> opened/started, (finished spawning toe)
|
||||||
|
# |_<Task: blah blah..> repr of toe, in py these look like <objs>
|
||||||
|
#
|
||||||
|
# >) closing/exiting/stopping,
|
||||||
|
# )> closed/exited/stopped,
|
||||||
|
# |_<Task: blah blah..>
|
||||||
|
# [OR <), )< ?? ]
|
||||||
|
#
|
||||||
|
# ending = )
|
||||||
|
# >c) cancelling to close/exit
|
||||||
|
# c)> cancelled (caused close), OR?
|
||||||
|
# |_<Actor: ..>
|
||||||
|
# OR maybe "<c)" which better indicates the cancel being
|
||||||
|
# "delivered/returned" / returned" to LHS?
|
||||||
|
#
|
||||||
|
# >x) erroring to eventuall exit
|
||||||
|
# x)> errored and terminated
|
||||||
|
# |_<Actor: ...>
|
||||||
|
#
|
||||||
|
# scopes: supers/nurseries, IPC-ctxs, sessions, perms, etc.
|
||||||
|
# >{ opening
|
||||||
|
# {> opened
|
||||||
|
# }> closed
|
||||||
|
# >} closing
|
||||||
|
#
|
||||||
|
# storage: like queues, shm-buffers, files, etc..
|
||||||
|
# >[ opening
|
||||||
|
# [> opened
|
||||||
|
# |_<FileObj: ..>
|
||||||
|
#
|
||||||
|
# >] closing
|
||||||
|
# ]> closed
|
||||||
|
|
||||||
|
# IPC ops: channels, transports, msging
|
||||||
|
# => req msg
|
||||||
|
# <= resp msg
|
||||||
|
# <=> 2-way streaming (of msgs)
|
||||||
|
# <- recv 1 msg
|
||||||
|
# -> send 1 msg
|
||||||
|
#
|
||||||
|
# TODO: still not sure on R/L-HS approach..?
|
||||||
|
# =>( send-req to exec start (task, actor, thread..)
|
||||||
|
# (<= recv-req to ^
|
||||||
|
#
|
||||||
|
# (<= recv-req ^
|
||||||
|
# <=( recv-resp opened remote exec primitive
|
||||||
|
# <=) recv-resp closed
|
||||||
|
#
|
||||||
|
# )<=c req to stop due to cancel
|
||||||
|
# c=>) req to stop due to cancel
|
||||||
|
#
|
||||||
|
# =>{ recv-req to open
|
||||||
|
# <={ send-status that it closed
|
||||||
|
|
||||||
|
tree_str: str,
|
||||||
|
|
||||||
|
# NOTE: so move back-from-the-left of the `input_op` by
|
||||||
|
# this amount.
|
||||||
|
back_from_op: int = 0,
|
||||||
|
) -> str:
|
||||||
|
'''
|
||||||
|
Depth-increment the input (presumably hierarchy/supervision)
|
||||||
|
input "tree string" below the provided `input_op` execution
|
||||||
|
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
|
||||||
|
`tree_str` to nest content aligned with the ops last char.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return (
|
||||||
|
f'{input_op}\n'
|
||||||
|
+
|
||||||
|
textwrap.indent(
|
||||||
|
tree_str,
|
||||||
|
prefix=(
|
||||||
|
len(input_op)
|
||||||
|
-
|
||||||
|
(back_from_op + 1)
|
||||||
|
) * ' ',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
|
@ -72,11 +72,223 @@ if TYPE_CHECKING:
|
||||||
log = log.get_logger(__name__)
|
log = log.get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def maybe_wait_on_canced_subs(
|
||||||
|
uid: tuple[str, str],
|
||||||
|
chan: Channel,
|
||||||
|
disconnected: bool,
|
||||||
|
|
||||||
|
actor: Actor|None = None,
|
||||||
|
chan_drain_timeout: float = 0.5,
|
||||||
|
an_exit_timeout: float = 0.5,
|
||||||
|
|
||||||
|
) -> ActorNursery|None:
|
||||||
|
'''
|
||||||
|
When a process-local actor-nursery is found for the given actor
|
||||||
|
`uid` (i.e. that peer is **also** a subactor of this parent), we
|
||||||
|
attempt to (with timeouts) wait on,
|
||||||
|
|
||||||
|
- all IPC msgs to drain on the (common) `Channel` such that all
|
||||||
|
local `Context`-parent-tasks can also gracefully collect
|
||||||
|
`ContextCancelled` msgs from their respective remote children
|
||||||
|
vs. a `chan_drain_timeout`.
|
||||||
|
|
||||||
|
- the actor-nursery to cancel-n-join all its supervised children
|
||||||
|
(processes) *gracefully* vs. a `an_exit_timeout` and thus also
|
||||||
|
detect cases where the IPC transport connection broke but
|
||||||
|
a sub-process is detected as still alive (a case that happens
|
||||||
|
when the subactor is still in an active debugger REPL session).
|
||||||
|
|
||||||
|
If the timeout expires in either case we ofc report with warning.
|
||||||
|
|
||||||
|
'''
|
||||||
|
actor = actor or _state.current_actor()
|
||||||
|
|
||||||
|
# XXX running outside actor-runtime usage,
|
||||||
|
# - unit testing
|
||||||
|
# - possibly manual usage (eventually) ?
|
||||||
|
if not actor:
|
||||||
|
return None
|
||||||
|
|
||||||
|
local_nursery: (
|
||||||
|
ActorNursery|None
|
||||||
|
) = actor._actoruid2nursery.get(uid)
|
||||||
|
|
||||||
|
# This is set in `Portal.cancel_actor()`. So if
|
||||||
|
# the peer was cancelled we try to wait for them
|
||||||
|
# to tear down their side of the connection before
|
||||||
|
# moving on with closing our own side.
|
||||||
|
if (
|
||||||
|
local_nursery
|
||||||
|
and (
|
||||||
|
actor._cancel_called
|
||||||
|
or
|
||||||
|
chan._cancel_called
|
||||||
|
)
|
||||||
|
#
|
||||||
|
# ^-TODO-^ along with this is there another condition
|
||||||
|
# that we should filter with to avoid entering this
|
||||||
|
# waiting block needlessly?
|
||||||
|
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||||
|
# only if the `._children` table is empty or has
|
||||||
|
# only `Portal`s with .chan._cancel_called ==
|
||||||
|
# True` as per what we had below; the MAIN DIFF
|
||||||
|
# BEING that just bc one `Portal.cancel_actor()`
|
||||||
|
# was called, doesn't mean the whole actor-nurse
|
||||||
|
# is gonna exit any time soon right!?
|
||||||
|
#
|
||||||
|
# or
|
||||||
|
# all(chan._cancel_called for chan in chans)
|
||||||
|
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Waiting on cancel request to peer..\n'
|
||||||
|
f'c)=>\n'
|
||||||
|
f' |_{chan.uid}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: this is a soft wait on the channel (and its
|
||||||
|
# underlying transport protocol) to close from the
|
||||||
|
# remote peer side since we presume that any channel
|
||||||
|
# which is mapped to a sub-actor (i.e. it's managed
|
||||||
|
# by local actor-nursery) has a message that is sent
|
||||||
|
# to the peer likely by this actor (which may be in
|
||||||
|
# a shutdown sequence due to cancellation) when the
|
||||||
|
# local runtime here is now cancelled while
|
||||||
|
# (presumably) in the middle of msg loop processing.
|
||||||
|
chan_info: str = (
|
||||||
|
f'{chan.uid}\n'
|
||||||
|
f'|_{chan}\n'
|
||||||
|
f' |_{chan.transport}\n\n'
|
||||||
|
)
|
||||||
|
with trio.move_on_after(chan_drain_timeout) as drain_cs:
|
||||||
|
drain_cs.shield = True
|
||||||
|
|
||||||
|
# attempt to wait for the far end to close the
|
||||||
|
# channel and bail after timeout (a 2-generals
|
||||||
|
# problem on closure).
|
||||||
|
assert chan.transport
|
||||||
|
async for msg in chan.transport.drain():
|
||||||
|
|
||||||
|
# try to deliver any lingering msgs
|
||||||
|
# before we destroy the channel.
|
||||||
|
# This accomplishes deterministic
|
||||||
|
# ``Portal.cancel_actor()`` cancellation by
|
||||||
|
# making sure any RPC response to that call is
|
||||||
|
# delivered the local calling task.
|
||||||
|
# TODO: factor this into a helper?
|
||||||
|
log.warning(
|
||||||
|
'Draining msg from disconnected peer\n'
|
||||||
|
f'{chan_info}'
|
||||||
|
f'{pformat(msg)}\n'
|
||||||
|
)
|
||||||
|
# cid: str|None = msg.get('cid')
|
||||||
|
cid: str|None = msg.cid
|
||||||
|
if cid:
|
||||||
|
# deliver response to local caller/waiter
|
||||||
|
await actor._deliver_ctx_payload(
|
||||||
|
chan,
|
||||||
|
cid,
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
if drain_cs.cancelled_caught:
|
||||||
|
log.warning(
|
||||||
|
'Timed out waiting on IPC transport channel to drain?\n'
|
||||||
|
f'{chan_info}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX NOTE XXX when no explicit call to
|
||||||
|
# `open_root_actor()` was made by the application
|
||||||
|
# (normally we implicitly make that call inside
|
||||||
|
# the first `.open_nursery()` in root-actor
|
||||||
|
# user/app code), we can assume that either we
|
||||||
|
# are NOT the root actor or are root but the
|
||||||
|
# runtime was started manually. and thus DO have
|
||||||
|
# to wait for the nursery-enterer to exit before
|
||||||
|
# shutting down the local runtime to avoid
|
||||||
|
# clobbering any ongoing subactor
|
||||||
|
# teardown/debugging/graceful-cancel.
|
||||||
|
#
|
||||||
|
# see matching note inside `._supervise.open_nursery()`
|
||||||
|
#
|
||||||
|
# TODO: should we have a separate cs + timeout
|
||||||
|
# block here?
|
||||||
|
if (
|
||||||
|
# XXX SO either,
|
||||||
|
# - not root OR,
|
||||||
|
# - is root but `open_root_actor()` was
|
||||||
|
# entered manually (in which case we do
|
||||||
|
# the equiv wait there using the
|
||||||
|
# `devx.debug` sub-sys APIs).
|
||||||
|
not local_nursery._implicit_runtime_started
|
||||||
|
):
|
||||||
|
log.runtime(
|
||||||
|
'Waiting on local actor nursery to exit..\n'
|
||||||
|
f'|_{local_nursery}\n'
|
||||||
|
)
|
||||||
|
with trio.move_on_after(an_exit_timeout) as an_exit_cs:
|
||||||
|
an_exit_cs.shield = True
|
||||||
|
await local_nursery.exited.wait()
|
||||||
|
|
||||||
|
# TODO: currently this is always triggering for every
|
||||||
|
# sub-daemon spawned from the `piker.services._mngr`?
|
||||||
|
# -[ ] how do we ensure that the IPC is supposed to
|
||||||
|
# be long lived and isn't just a register?
|
||||||
|
# |_ in the register case how can we signal that the
|
||||||
|
# ephemeral msg loop was intentional?
|
||||||
|
if (
|
||||||
|
# not local_nursery._implicit_runtime_started
|
||||||
|
# and
|
||||||
|
an_exit_cs.cancelled_caught
|
||||||
|
):
|
||||||
|
report: str = (
|
||||||
|
'Timed out waiting on local actor-nursery to exit?\n'
|
||||||
|
f'c)>\n'
|
||||||
|
f' |_{local_nursery}\n'
|
||||||
|
)
|
||||||
|
if children := local_nursery._children:
|
||||||
|
# indent from above local-nurse repr
|
||||||
|
report += (
|
||||||
|
f' |_{pformat(children)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
log.warning(report)
|
||||||
|
|
||||||
|
if disconnected:
|
||||||
|
# if the transport died and this actor is still
|
||||||
|
# registered within a local nursery, we report
|
||||||
|
# that the IPC layer may have failed
|
||||||
|
# unexpectedly since it may be the cause of
|
||||||
|
# other downstream errors.
|
||||||
|
entry: tuple|None = local_nursery._children.get(uid)
|
||||||
|
if entry:
|
||||||
|
proc: trio.Process
|
||||||
|
_, proc, _ = entry
|
||||||
|
|
||||||
|
if (
|
||||||
|
(poll := getattr(proc, 'poll', None))
|
||||||
|
and
|
||||||
|
poll() is None # proc still alive
|
||||||
|
):
|
||||||
|
# TODO: change log level based on
|
||||||
|
# detecting whether chan was created for
|
||||||
|
# ephemeral `.register_actor()` request!
|
||||||
|
# -[ ] also, that should be avoidable by
|
||||||
|
# re-using any existing chan from the
|
||||||
|
# `._discovery.get_registry()` call as
|
||||||
|
# well..
|
||||||
|
log.runtime(
|
||||||
|
f'Peer IPC broke but subproc is alive?\n\n'
|
||||||
|
|
||||||
|
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
return local_nursery
|
||||||
|
|
||||||
# TODO multi-tpt support with per-proto peer tracking?
|
# TODO multi-tpt support with per-proto peer tracking?
|
||||||
#
|
#
|
||||||
# -[x] maybe change to mod-func and rename for implied
|
# -[x] maybe change to mod-func and rename for implied
|
||||||
# multi-transport semantics?
|
# multi-transport semantics?
|
||||||
#
|
|
||||||
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
|
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
|
||||||
# so that we can query per tpt all peer contact infos?
|
# so that we can query per tpt all peer contact infos?
|
||||||
# |_[ ] possibly provide a global viewing via a
|
# |_[ ] possibly provide a global viewing via a
|
||||||
|
@ -87,7 +299,6 @@ async def handle_stream_from_peer(
|
||||||
|
|
||||||
*,
|
*,
|
||||||
server: IPCServer,
|
server: IPCServer,
|
||||||
actor: Actor,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -119,9 +330,10 @@ async def handle_stream_from_peer(
|
||||||
|
|
||||||
# initial handshake with peer phase
|
# initial handshake with peer phase
|
||||||
try:
|
try:
|
||||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
if actor := _state.current_actor():
|
||||||
aid=actor.aid,
|
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||||
)
|
aid=actor.aid,
|
||||||
|
)
|
||||||
except (
|
except (
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||||
|
@ -222,8 +434,7 @@ async def handle_stream_from_peer(
|
||||||
disconnected,
|
disconnected,
|
||||||
last_msg,
|
last_msg,
|
||||||
) = await _rpc.process_messages(
|
) = await _rpc.process_messages(
|
||||||
actor,
|
chan=chan,
|
||||||
chan,
|
|
||||||
)
|
)
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
@ -234,179 +445,16 @@ async def handle_stream_from_peer(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
local_nursery: (
|
|
||||||
ActorNursery|None
|
|
||||||
) = actor._actoruid2nursery.get(uid)
|
|
||||||
|
|
||||||
# This is set in ``Portal.cancel_actor()``. So if
|
# check if there are subs which we should gracefully join at
|
||||||
# the peer was cancelled we try to wait for them
|
# both the inter-actor-task and subprocess levels to
|
||||||
# to tear down their side of the connection before
|
# gracefully remote cancel and later disconnect (particularly
|
||||||
# moving on with closing our own side.
|
# for permitting subs engaged in active debug-REPL sessions).
|
||||||
if (
|
local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
|
||||||
local_nursery
|
uid=uid,
|
||||||
and (
|
chan=chan,
|
||||||
actor._cancel_called
|
disconnected=disconnected,
|
||||||
or
|
)
|
||||||
chan._cancel_called
|
|
||||||
)
|
|
||||||
#
|
|
||||||
# ^-TODO-^ along with this is there another condition
|
|
||||||
# that we should filter with to avoid entering this
|
|
||||||
# waiting block needlessly?
|
|
||||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
|
||||||
# only if the `._children` table is empty or has
|
|
||||||
# only `Portal`s with .chan._cancel_called ==
|
|
||||||
# True` as per what we had below; the MAIN DIFF
|
|
||||||
# BEING that just bc one `Portal.cancel_actor()`
|
|
||||||
# was called, doesn't mean the whole actor-nurse
|
|
||||||
# is gonna exit any time soon right!?
|
|
||||||
#
|
|
||||||
# or
|
|
||||||
# all(chan._cancel_called for chan in chans)
|
|
||||||
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
'Waiting on cancel request to peer..\n'
|
|
||||||
f'c)=>\n'
|
|
||||||
f' |_{chan.uid}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: this is a soft wait on the channel (and its
|
|
||||||
# underlying transport protocol) to close from the
|
|
||||||
# remote peer side since we presume that any channel
|
|
||||||
# which is mapped to a sub-actor (i.e. it's managed
|
|
||||||
# by local actor-nursery) has a message that is sent
|
|
||||||
# to the peer likely by this actor (which may be in
|
|
||||||
# a shutdown sequence due to cancellation) when the
|
|
||||||
# local runtime here is now cancelled while
|
|
||||||
# (presumably) in the middle of msg loop processing.
|
|
||||||
chan_info: str = (
|
|
||||||
f'{chan.uid}\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
f' |_{chan.transport}\n\n'
|
|
||||||
)
|
|
||||||
with trio.move_on_after(0.5) as drain_cs:
|
|
||||||
drain_cs.shield = True
|
|
||||||
|
|
||||||
# attempt to wait for the far end to close the
|
|
||||||
# channel and bail after timeout (a 2-generals
|
|
||||||
# problem on closure).
|
|
||||||
assert chan.transport
|
|
||||||
async for msg in chan.transport.drain():
|
|
||||||
|
|
||||||
# try to deliver any lingering msgs
|
|
||||||
# before we destroy the channel.
|
|
||||||
# This accomplishes deterministic
|
|
||||||
# ``Portal.cancel_actor()`` cancellation by
|
|
||||||
# making sure any RPC response to that call is
|
|
||||||
# delivered the local calling task.
|
|
||||||
# TODO: factor this into a helper?
|
|
||||||
log.warning(
|
|
||||||
'Draining msg from disconnected peer\n'
|
|
||||||
f'{chan_info}'
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
|
||||||
# cid: str|None = msg.get('cid')
|
|
||||||
cid: str|None = msg.cid
|
|
||||||
if cid:
|
|
||||||
# deliver response to local caller/waiter
|
|
||||||
await actor._deliver_ctx_payload(
|
|
||||||
chan,
|
|
||||||
cid,
|
|
||||||
msg,
|
|
||||||
)
|
|
||||||
if drain_cs.cancelled_caught:
|
|
||||||
log.warning(
|
|
||||||
'Timed out waiting on IPC transport channel to drain?\n'
|
|
||||||
f'{chan_info}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX NOTE XXX when no explicit call to
|
|
||||||
# `open_root_actor()` was made by the application
|
|
||||||
# (normally we implicitly make that call inside
|
|
||||||
# the first `.open_nursery()` in root-actor
|
|
||||||
# user/app code), we can assume that either we
|
|
||||||
# are NOT the root actor or are root but the
|
|
||||||
# runtime was started manually. and thus DO have
|
|
||||||
# to wait for the nursery-enterer to exit before
|
|
||||||
# shutting down the local runtime to avoid
|
|
||||||
# clobbering any ongoing subactor
|
|
||||||
# teardown/debugging/graceful-cancel.
|
|
||||||
#
|
|
||||||
# see matching note inside `._supervise.open_nursery()`
|
|
||||||
#
|
|
||||||
# TODO: should we have a separate cs + timeout
|
|
||||||
# block here?
|
|
||||||
if (
|
|
||||||
# XXX SO either,
|
|
||||||
# - not root OR,
|
|
||||||
# - is root but `open_root_actor()` was
|
|
||||||
# entered manually (in which case we do
|
|
||||||
# the equiv wait there using the
|
|
||||||
# `devx._debug` sub-sys APIs).
|
|
||||||
not local_nursery._implicit_runtime_started
|
|
||||||
):
|
|
||||||
log.runtime(
|
|
||||||
'Waiting on local actor nursery to exit..\n'
|
|
||||||
f'|_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
with trio.move_on_after(0.5) as an_exit_cs:
|
|
||||||
an_exit_cs.shield = True
|
|
||||||
await local_nursery.exited.wait()
|
|
||||||
|
|
||||||
# TODO: currently this is always triggering for every
|
|
||||||
# sub-daemon spawned from the `piker.services._mngr`?
|
|
||||||
# -[ ] how do we ensure that the IPC is supposed to
|
|
||||||
# be long lived and isn't just a register?
|
|
||||||
# |_ in the register case how can we signal that the
|
|
||||||
# ephemeral msg loop was intentional?
|
|
||||||
if (
|
|
||||||
# not local_nursery._implicit_runtime_started
|
|
||||||
# and
|
|
||||||
an_exit_cs.cancelled_caught
|
|
||||||
):
|
|
||||||
report: str = (
|
|
||||||
'Timed out waiting on local actor-nursery to exit?\n'
|
|
||||||
f'c)>\n'
|
|
||||||
f' |_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
if children := local_nursery._children:
|
|
||||||
# indent from above local-nurse repr
|
|
||||||
report += (
|
|
||||||
f' |_{pformat(children)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
log.warning(report)
|
|
||||||
|
|
||||||
if disconnected:
|
|
||||||
# if the transport died and this actor is still
|
|
||||||
# registered within a local nursery, we report
|
|
||||||
# that the IPC layer may have failed
|
|
||||||
# unexpectedly since it may be the cause of
|
|
||||||
# other downstream errors.
|
|
||||||
entry: tuple|None = local_nursery._children.get(uid)
|
|
||||||
if entry:
|
|
||||||
proc: trio.Process
|
|
||||||
_, proc, _ = entry
|
|
||||||
|
|
||||||
if (
|
|
||||||
(poll := getattr(proc, 'poll', None))
|
|
||||||
and
|
|
||||||
poll() is None # proc still alive
|
|
||||||
):
|
|
||||||
# TODO: change log level based on
|
|
||||||
# detecting whether chan was created for
|
|
||||||
# ephemeral `.register_actor()` request!
|
|
||||||
# -[ ] also, that should be avoidable by
|
|
||||||
# re-using any existing chan from the
|
|
||||||
# `._discovery.get_registry()` call as
|
|
||||||
# well..
|
|
||||||
log.runtime(
|
|
||||||
f'Peer IPC broke but subproc is alive?\n\n'
|
|
||||||
|
|
||||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
|
||||||
f' |_{proc}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# ``Channel`` teardown and closure sequence
|
# ``Channel`` teardown and closure sequence
|
||||||
# drop ref to channel so it can be gc-ed and disconnected
|
# drop ref to channel so it can be gc-ed and disconnected
|
||||||
|
@ -467,11 +515,11 @@ async def handle_stream_from_peer(
|
||||||
# from broken debug TTY locking due to
|
# from broken debug TTY locking due to
|
||||||
# msg-spec races on application using RunVar...
|
# msg-spec races on application using RunVar...
|
||||||
if (
|
if (
|
||||||
|
local_nursery
|
||||||
|
and
|
||||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||||
and
|
and
|
||||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||||
and
|
|
||||||
local_nursery
|
|
||||||
):
|
):
|
||||||
entry: tuple|None = local_nursery._children.get(
|
entry: tuple|None = local_nursery._children.get(
|
||||||
tuple(pdb_user_uid)
|
tuple(pdb_user_uid)
|
||||||
|
@ -804,7 +852,6 @@ class IPCServer(Struct):
|
||||||
async def listen_on(
|
async def listen_on(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
actor: Actor,
|
|
||||||
accept_addrs: list[tuple[str, int|str]]|None = None,
|
accept_addrs: list[tuple[str, int|str]]|None = None,
|
||||||
stream_handler_nursery: Nursery|None = None,
|
stream_handler_nursery: Nursery|None = None,
|
||||||
) -> list[IPCEndpoint]:
|
) -> list[IPCEndpoint]:
|
||||||
|
@ -837,20 +884,19 @@ class IPCServer(Struct):
|
||||||
f'{self}\n'
|
f'{self}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Binding to endpoints for,\n'
|
f'Binding to endpoints for,\n'
|
||||||
f'{accept_addrs}\n'
|
f'{accept_addrs}\n'
|
||||||
)
|
)
|
||||||
eps: list[IPCEndpoint] = await self._parent_tn.start(
|
eps: list[IPCEndpoint] = await self._parent_tn.start(
|
||||||
partial(
|
partial(
|
||||||
_serve_ipc_eps,
|
_serve_ipc_eps,
|
||||||
actor=actor,
|
|
||||||
server=self,
|
server=self,
|
||||||
stream_handler_tn=stream_handler_nursery,
|
stream_handler_tn=stream_handler_nursery,
|
||||||
listen_addrs=accept_addrs,
|
listen_addrs=accept_addrs,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Started IPC endpoints\n'
|
f'Started IPC endpoints\n'
|
||||||
f'{eps}\n'
|
f'{eps}\n'
|
||||||
)
|
)
|
||||||
|
@ -873,7 +919,6 @@ class IPCServer(Struct):
|
||||||
|
|
||||||
async def _serve_ipc_eps(
|
async def _serve_ipc_eps(
|
||||||
*,
|
*,
|
||||||
actor: Actor,
|
|
||||||
server: IPCServer,
|
server: IPCServer,
|
||||||
stream_handler_tn: Nursery,
|
stream_handler_tn: Nursery,
|
||||||
listen_addrs: list[tuple[str, int|str]],
|
listen_addrs: list[tuple[str, int|str]],
|
||||||
|
@ -907,12 +952,13 @@ async def _serve_ipc_eps(
|
||||||
stream_handler_tn=stream_handler_tn,
|
stream_handler_tn=stream_handler_tn,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Starting new endpoint listener\n'
|
f'Starting new endpoint listener\n'
|
||||||
f'{ep}\n'
|
f'{ep}\n'
|
||||||
)
|
)
|
||||||
listener: trio.abc.Listener = await ep.start_listener()
|
listener: trio.abc.Listener = await ep.start_listener()
|
||||||
assert listener is ep._listener
|
assert listener is ep._listener
|
||||||
|
# actor = _state.current_actor()
|
||||||
# if actor.is_registry:
|
# if actor.is_registry:
|
||||||
# import pdbp; pdbp.set_trace()
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
|
@ -937,7 +983,6 @@ async def _serve_ipc_eps(
|
||||||
handler=partial(
|
handler=partial(
|
||||||
handle_stream_from_peer,
|
handle_stream_from_peer,
|
||||||
server=server,
|
server=server,
|
||||||
actor=actor,
|
|
||||||
),
|
),
|
||||||
listeners=listeners,
|
listeners=listeners,
|
||||||
|
|
||||||
|
@ -948,13 +993,13 @@ async def _serve_ipc_eps(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# TODO, wow make this message better! XD
|
# TODO, wow make this message better! XD
|
||||||
log.info(
|
log.runtime(
|
||||||
'Started server(s)\n'
|
'Started server(s)\n'
|
||||||
+
|
+
|
||||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Started IPC endpoints\n'
|
f'Started IPC endpoints\n'
|
||||||
f'{eps}\n'
|
f'{eps}\n'
|
||||||
)
|
)
|
||||||
|
@ -970,6 +1015,7 @@ async def _serve_ipc_eps(
|
||||||
ep.close_listener()
|
ep.close_listener()
|
||||||
server._endpoints.remove(ep)
|
server._endpoints.remove(ep)
|
||||||
|
|
||||||
|
# actor = _state.current_actor()
|
||||||
# if actor.is_arbiter:
|
# if actor.is_arbiter:
|
||||||
# import pdbp; pdbp.set_trace()
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
|
@ -980,7 +1026,6 @@ async def _serve_ipc_eps(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_ipc_server(
|
async def open_ipc_server(
|
||||||
actor: Actor,
|
|
||||||
parent_tn: Nursery|None = None,
|
parent_tn: Nursery|None = None,
|
||||||
stream_handler_tn: Nursery|None = None,
|
stream_handler_tn: Nursery|None = None,
|
||||||
|
|
||||||
|
|
|
@ -127,6 +127,11 @@ async def start_listener(
|
||||||
Start a TCP socket listener on the given `TCPAddress`.
|
Start a TCP socket listener on the given `TCPAddress`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
log.info(
|
||||||
|
f'Attempting to bind TCP socket\n'
|
||||||
|
f'>[\n'
|
||||||
|
f'|_{addr}\n'
|
||||||
|
)
|
||||||
# ?TODO, maybe we should just change the lower-level call this is
|
# ?TODO, maybe we should just change the lower-level call this is
|
||||||
# using internall per-listener?
|
# using internall per-listener?
|
||||||
listeners: list[SocketListener] = await open_tcp_listeners(
|
listeners: list[SocketListener] = await open_tcp_listeners(
|
||||||
|
@ -140,6 +145,12 @@ async def start_listener(
|
||||||
assert len(listeners) == 1
|
assert len(listeners) == 1
|
||||||
listener = listeners[0]
|
listener = listeners[0]
|
||||||
host, port = listener.socket.getsockname()[:2]
|
host, port = listener.socket.getsockname()[:2]
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'Listening on TCP socket\n'
|
||||||
|
f'[>\n'
|
||||||
|
f' |_{addr}\n'
|
||||||
|
)
|
||||||
return listener
|
return listener
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -270,7 +270,9 @@ def get_logger(
|
||||||
subsys_spec: str|None = None,
|
subsys_spec: str|None = None,
|
||||||
|
|
||||||
) -> StackLevelAdapter:
|
) -> StackLevelAdapter:
|
||||||
'''Return the package log or a sub-logger for ``name`` if provided.
|
'''
|
||||||
|
Return the `tractor`-library root logger or a sub-logger for
|
||||||
|
`name` if provided.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
log: Logger
|
log: Logger
|
||||||
|
@ -282,7 +284,7 @@ def get_logger(
|
||||||
name != _proj_name
|
name != _proj_name
|
||||||
):
|
):
|
||||||
|
|
||||||
# NOTE: for handling for modules that use ``get_logger(__name__)``
|
# NOTE: for handling for modules that use `get_logger(__name__)`
|
||||||
# we make the following stylistic choice:
|
# we make the following stylistic choice:
|
||||||
# - always avoid duplicate project-package token
|
# - always avoid duplicate project-package token
|
||||||
# in msg output: i.e. tractor.tractor.ipc._chan.py in header
|
# in msg output: i.e. tractor.tractor.ipc._chan.py in header
|
||||||
|
@ -331,7 +333,7 @@ def get_logger(
|
||||||
|
|
||||||
def get_console_log(
|
def get_console_log(
|
||||||
level: str|None = None,
|
level: str|None = None,
|
||||||
logger: Logger|None = None,
|
logger: Logger|StackLevelAdapter|None = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> LoggerAdapter:
|
) -> LoggerAdapter:
|
||||||
|
@ -344,12 +346,23 @@ def get_console_log(
|
||||||
Yeah yeah, i know we can use `logging.config.dictConfig()`. You do it.
|
Yeah yeah, i know we can use `logging.config.dictConfig()`. You do it.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
log = get_logger(
|
# get/create a stack-aware-adapter
|
||||||
logger=logger,
|
if (
|
||||||
**kwargs
|
logger
|
||||||
) # set a root logger
|
and
|
||||||
logger: Logger = log.logger
|
isinstance(logger, StackLevelAdapter)
|
||||||
|
):
|
||||||
|
# XXX, for ex. when passed in by a caller wrapping some
|
||||||
|
# other lib's logger instance with our level-adapter.
|
||||||
|
log = logger
|
||||||
|
|
||||||
|
else:
|
||||||
|
log: StackLevelAdapter = get_logger(
|
||||||
|
logger=logger,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
logger: Logger|StackLevelAdapter = log.logger
|
||||||
if not level:
|
if not level:
|
||||||
return log
|
return log
|
||||||
|
|
||||||
|
@ -367,10 +380,7 @@ def get_console_log(
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
fmt = LOG_FORMAT
|
fmt: str = LOG_FORMAT # always apply our format?
|
||||||
# if logger:
|
|
||||||
# fmt = None
|
|
||||||
|
|
||||||
handler = StreamHandler()
|
handler = StreamHandler()
|
||||||
formatter = colorlog.ColoredFormatter(
|
formatter = colorlog.ColoredFormatter(
|
||||||
fmt=fmt,
|
fmt=fmt,
|
||||||
|
|
|
@ -31,4 +31,5 @@ from ._broadcast import (
|
||||||
)
|
)
|
||||||
from ._beg import (
|
from ._beg import (
|
||||||
collapse_eg as collapse_eg,
|
collapse_eg as collapse_eg,
|
||||||
|
maybe_collapse_eg as maybe_collapse_eg,
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue