Merge pull request #411 from goodboy/tpt_tolerance
Tpt-tolerance: more lowlevel `trio` CRE/BRE -> `TransportClosed` translationsoob_cancel_testing
commit
70bb77280e
|
|
@ -17,6 +17,7 @@ from tractor import (
|
||||||
MsgStream,
|
MsgStream,
|
||||||
_testing,
|
_testing,
|
||||||
trionics,
|
trionics,
|
||||||
|
TransportClosed,
|
||||||
)
|
)
|
||||||
import trio
|
import trio
|
||||||
import pytest
|
import pytest
|
||||||
|
|
@ -208,12 +209,16 @@ async def main(
|
||||||
# TODO: is this needed or no?
|
# TODO: is this needed or no?
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except (
|
||||||
|
trio.ClosedResourceError,
|
||||||
|
TransportClosed,
|
||||||
|
) as _tpt_err:
|
||||||
# NOTE: don't send if we already broke the
|
# NOTE: don't send if we already broke the
|
||||||
# connection to avoid raising a closed-error
|
# connection to avoid raising a closed-error
|
||||||
# such that we drop through to the ctl-c
|
# such that we drop through to the ctl-c
|
||||||
# mashing by user.
|
# mashing by user.
|
||||||
await trio.sleep(0.01)
|
with trio.CancelScope(shield=True):
|
||||||
|
await trio.sleep(0.01)
|
||||||
|
|
||||||
# timeout: int = 1
|
# timeout: int = 1
|
||||||
# with trio.move_on_after(timeout) as cs:
|
# with trio.move_on_after(timeout) as cs:
|
||||||
|
|
@ -247,6 +252,7 @@ async def main(
|
||||||
await stream.send(i)
|
await stream.send(i)
|
||||||
pytest.fail('stream not closed?')
|
pytest.fail('stream not closed?')
|
||||||
except (
|
except (
|
||||||
|
TransportClosed,
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.EndOfChannel,
|
trio.EndOfChannel,
|
||||||
) as send_err:
|
) as send_err:
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import time
|
import time
|
||||||
|
import signal
|
||||||
from typing import (
|
from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
|
@ -69,12 +70,15 @@ def spawn(
|
||||||
import os
|
import os
|
||||||
os.environ['PYTHON_COLORS'] = '0'
|
os.environ['PYTHON_COLORS'] = '0'
|
||||||
|
|
||||||
|
spawned: PexpectSpawner|None = None
|
||||||
|
|
||||||
def _spawn(
|
def _spawn(
|
||||||
cmd: str,
|
cmd: str,
|
||||||
**mkcmd_kwargs,
|
**mkcmd_kwargs,
|
||||||
) -> pty_spawn.spawn:
|
) -> pty_spawn.spawn:
|
||||||
|
nonlocal spawned
|
||||||
unset_colors()
|
unset_colors()
|
||||||
return testdir.spawn(
|
spawned = testdir.spawn(
|
||||||
cmd=mk_cmd(
|
cmd=mk_cmd(
|
||||||
cmd,
|
cmd,
|
||||||
**mkcmd_kwargs,
|
**mkcmd_kwargs,
|
||||||
|
|
@ -84,9 +88,35 @@ def spawn(
|
||||||
# ^TODO? get `pytest` core to expose underlying
|
# ^TODO? get `pytest` core to expose underlying
|
||||||
# `pexpect.spawn()` stuff?
|
# `pexpect.spawn()` stuff?
|
||||||
)
|
)
|
||||||
|
return spawned
|
||||||
|
|
||||||
# such that test-dep can pass input script name.
|
# such that test-dep can pass input script name.
|
||||||
return _spawn # the `PexpectSpawner`, type alias.
|
yield _spawn # the `PexpectSpawner`, type alias.
|
||||||
|
|
||||||
|
if (
|
||||||
|
spawned
|
||||||
|
and
|
||||||
|
(ptyproc := spawned.ptyproc)
|
||||||
|
):
|
||||||
|
start: float = time.time()
|
||||||
|
timeout: float = 5
|
||||||
|
while (
|
||||||
|
ptyproc.isalive()
|
||||||
|
and
|
||||||
|
(
|
||||||
|
(_time_took := (time.time() - start))
|
||||||
|
<
|
||||||
|
timeout
|
||||||
|
)
|
||||||
|
):
|
||||||
|
ptyproc.kill(signal.SIGINT)
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
if ptyproc.isalive():
|
||||||
|
ptyproc.kill(signal.SIGKILL)
|
||||||
|
|
||||||
|
# TODO? ensure we've cleaned up any UDS-paths?
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(
|
@pytest.fixture(
|
||||||
|
|
|
||||||
|
|
@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
|
||||||
['peer IPC channel closed abruptly?',
|
['peer IPC channel closed abruptly?',
|
||||||
'another task closed this fd',
|
'another task closed this fd',
|
||||||
'Debug lock request was CANCELLED?',
|
'Debug lock request was CANCELLED?',
|
||||||
"TransportClosed: 'MsgpackUDSStream' was already closed locally ?",]
|
"'MsgpackUDSStream' was already closed locally?",
|
||||||
|
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
|
||||||
|
# ?TODO^? match depending on `tpt_proto(s)`?
|
||||||
|
]
|
||||||
|
|
||||||
# XXX races on whether these show/hit?
|
# XXX races on whether these show/hit?
|
||||||
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',
|
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',
|
||||||
|
|
|
||||||
|
|
@ -98,7 +98,8 @@ def test_ipc_channel_break_during_stream(
|
||||||
expect_final_exc = TransportClosed
|
expect_final_exc = TransportClosed
|
||||||
|
|
||||||
mod: ModuleType = import_path(
|
mod: ModuleType = import_path(
|
||||||
examples_dir() / 'advanced_faults'
|
examples_dir()
|
||||||
|
/ 'advanced_faults'
|
||||||
/ 'ipc_failure_during_stream.py',
|
/ 'ipc_failure_during_stream.py',
|
||||||
root=examples_dir(),
|
root=examples_dir(),
|
||||||
consider_namespace_packages=False,
|
consider_namespace_packages=False,
|
||||||
|
|
@ -113,8 +114,9 @@ def test_ipc_channel_break_during_stream(
|
||||||
if (
|
if (
|
||||||
# only expect EoC if trans is broken on the child side,
|
# only expect EoC if trans is broken on the child side,
|
||||||
ipc_break['break_child_ipc_after'] is not False
|
ipc_break['break_child_ipc_after'] is not False
|
||||||
|
and
|
||||||
# AND we tell the child to call `MsgStream.aclose()`.
|
# AND we tell the child to call `MsgStream.aclose()`.
|
||||||
and pre_aclose_msgstream
|
pre_aclose_msgstream
|
||||||
):
|
):
|
||||||
# expect_final_exc = trio.EndOfChannel
|
# expect_final_exc = trio.EndOfChannel
|
||||||
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
|
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
|
||||||
|
|
@ -160,7 +162,8 @@ def test_ipc_channel_break_during_stream(
|
||||||
ipc_break['break_child_ipc_after'] is not False
|
ipc_break['break_child_ipc_after'] is not False
|
||||||
and (
|
and (
|
||||||
ipc_break['break_parent_ipc_after']
|
ipc_break['break_parent_ipc_after']
|
||||||
> ipc_break['break_child_ipc_after']
|
>
|
||||||
|
ipc_break['break_child_ipc_after']
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
if pre_aclose_msgstream:
|
if pre_aclose_msgstream:
|
||||||
|
|
@ -248,8 +251,15 @@ def test_ipc_channel_break_during_stream(
|
||||||
# get raw instance from pytest wrapper
|
# get raw instance from pytest wrapper
|
||||||
value = excinfo.value
|
value = excinfo.value
|
||||||
if isinstance(value, ExceptionGroup):
|
if isinstance(value, ExceptionGroup):
|
||||||
excs = value.exceptions
|
excs: tuple[Exception] = value.exceptions
|
||||||
assert len(excs) == 1
|
assert (
|
||||||
|
len(excs) <= 2
|
||||||
|
and
|
||||||
|
all(
|
||||||
|
isinstance(exc, TransportClosed)
|
||||||
|
for exc in excs
|
||||||
|
)
|
||||||
|
)
|
||||||
final_exc = excs[0]
|
final_exc = excs[0]
|
||||||
assert isinstance(final_exc, expect_final_exc)
|
assert isinstance(final_exc, expect_final_exc)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,12 +11,13 @@ import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import ( # typing
|
from tractor import ( # typing
|
||||||
Actor,
|
Actor,
|
||||||
current_actor,
|
|
||||||
open_nursery,
|
|
||||||
Portal,
|
|
||||||
Context,
|
Context,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
MsgStream,
|
||||||
|
Portal,
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
|
current_actor,
|
||||||
|
open_nursery,
|
||||||
)
|
)
|
||||||
from tractor._testing import (
|
from tractor._testing import (
|
||||||
# tractor_test,
|
# tractor_test,
|
||||||
|
|
@ -796,8 +797,8 @@ async def basic_echo_server(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Just the simplest `MsgStream` echo server which resays what
|
Just the simplest `MsgStream` echo server which resays what you
|
||||||
you told it but with its uid in front ;)
|
told it but with its uid in front ;)
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor: Actor = tractor.current_actor()
|
actor: Actor = tractor.current_actor()
|
||||||
|
|
@ -966,9 +967,14 @@ async def tell_little_bro(
|
||||||
|
|
||||||
caller: str = '',
|
caller: str = '',
|
||||||
err_after: float|None = None,
|
err_after: float|None = None,
|
||||||
rng_seed: int = 50,
|
rng_seed: int = 100,
|
||||||
|
# NOTE, ensure ^ is large enough (on fast hw anyway)
|
||||||
|
# to ensure the peer cancel req arrives before the
|
||||||
|
# echoing dialog does itself Bp
|
||||||
):
|
):
|
||||||
# contact target actor, do a stream dialog.
|
# contact target actor, do a stream dialog.
|
||||||
|
lb: Portal
|
||||||
|
echo_ipc: MsgStream
|
||||||
async with (
|
async with (
|
||||||
tractor.wait_for_actor(
|
tractor.wait_for_actor(
|
||||||
name=actor_name
|
name=actor_name
|
||||||
|
|
@ -983,7 +989,6 @@ async def tell_little_bro(
|
||||||
else None
|
else None
|
||||||
),
|
),
|
||||||
) as (sub_ctx, first),
|
) as (sub_ctx, first),
|
||||||
|
|
||||||
sub_ctx.open_stream() as echo_ipc,
|
sub_ctx.open_stream() as echo_ipc,
|
||||||
):
|
):
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
|
|
@ -994,6 +999,7 @@ async def tell_little_bro(
|
||||||
i,
|
i,
|
||||||
)
|
)
|
||||||
await echo_ipc.send(msg)
|
await echo_ipc.send(msg)
|
||||||
|
await trio.sleep(0.001)
|
||||||
resp = await echo_ipc.receive()
|
resp = await echo_ipc.receive()
|
||||||
print(
|
print(
|
||||||
f'{caller} => {actor_name}: {msg}\n'
|
f'{caller} => {actor_name}: {msg}\n'
|
||||||
|
|
@ -1006,6 +1012,9 @@ async def tell_little_bro(
|
||||||
assert sub_uid != uid
|
assert sub_uid != uid
|
||||||
assert _i == i
|
assert _i == i
|
||||||
|
|
||||||
|
# XXX, usually should never get here!
|
||||||
|
# await tractor.pause()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'raise_client_error',
|
'raise_client_error',
|
||||||
|
|
@ -1020,6 +1029,9 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
raise_client_error: str,
|
raise_client_error: str,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
raise_sub_spawn_error_after: float|None,
|
raise_sub_spawn_error_after: float|None,
|
||||||
|
loglevel: str,
|
||||||
|
# ^XXX, set to 'warning' to see masked-exc warnings
|
||||||
|
# that may transpire during actor-nursery teardown.
|
||||||
):
|
):
|
||||||
# NOTE: this tests for the modden `mod wks open piker` bug
|
# NOTE: this tests for the modden `mod wks open piker` bug
|
||||||
# discovered as part of implementing workspace ctx
|
# discovered as part of implementing workspace ctx
|
||||||
|
|
@ -1049,6 +1061,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
registry_addrs=[reg_addr],
|
registry_addrs=[reg_addr],
|
||||||
|
loglevel=loglevel,
|
||||||
) as an:
|
) as an:
|
||||||
server: Portal = await an.start_actor(
|
server: Portal = await an.start_actor(
|
||||||
(server_name := 'spawn_server'),
|
(server_name := 'spawn_server'),
|
||||||
|
|
|
||||||
|
|
@ -163,7 +163,7 @@ def test_non_registrar_spawns_child(
|
||||||
|
|
||||||
async with sub_ptl.open_context(
|
async with sub_ptl.open_context(
|
||||||
get_root_portal,
|
get_root_portal,
|
||||||
) as (ctx, first):
|
) as (ctx, _):
|
||||||
print('Waiting for `sub` to connect back to us..')
|
print('Waiting for `sub` to connect back to us..')
|
||||||
|
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,7 @@ from ._exceptions import (
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
StreamOverrun,
|
StreamOverrun,
|
||||||
|
TransportClosed,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
)
|
)
|
||||||
|
|
@ -2428,10 +2429,7 @@ async def open_context_from_portal(
|
||||||
try:
|
try:
|
||||||
# await pause(shield=True)
|
# await pause(shield=True)
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
except (
|
except TransportClosed:
|
||||||
trio.BrokenResourceError,
|
|
||||||
trio.ClosedResourceError,
|
|
||||||
):
|
|
||||||
log.warning(
|
log.warning(
|
||||||
'IPC connection for context is broken?\n'
|
'IPC connection for context is broken?\n'
|
||||||
f'task: {ctx.cid}\n'
|
f'task: {ctx.cid}\n'
|
||||||
|
|
|
||||||
|
|
@ -91,10 +91,13 @@ async def get_registry(
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def get_root(
|
async def get_root(**kwargs) -> AsyncGenerator[Portal, None]:
|
||||||
**kwargs,
|
'''
|
||||||
) -> AsyncGenerator[Portal, None]:
|
Deliver the current actor's "root process" actor (yes in actor
|
||||||
|
and proc tree terms) by delivering a `Portal` from the spawn-time
|
||||||
|
provided contact address.
|
||||||
|
|
||||||
|
'''
|
||||||
# TODO: rename mailbox to `_root_maddr` when we finally
|
# TODO: rename mailbox to `_root_maddr` when we finally
|
||||||
# add and impl libp2p multi-addrs?
|
# add and impl libp2p multi-addrs?
|
||||||
addr = _runtime_vars['_root_mailbox']
|
addr = _runtime_vars['_root_mailbox']
|
||||||
|
|
@ -193,6 +196,11 @@ async def maybe_open_portal(
|
||||||
addr: UnwrappedAddress,
|
addr: UnwrappedAddress,
|
||||||
name: str,
|
name: str,
|
||||||
):
|
):
|
||||||
|
'''
|
||||||
|
Open a `Portal` to the actor serving @ `addr` or `None` if no
|
||||||
|
peer can be contacted or found.
|
||||||
|
|
||||||
|
'''
|
||||||
async with query_actor(
|
async with query_actor(
|
||||||
name=name,
|
name=name,
|
||||||
regaddr=addr,
|
regaddr=addr,
|
||||||
|
|
|
||||||
|
|
@ -329,18 +329,7 @@ class Portal:
|
||||||
# if we get here some weird cancellation case happened
|
# if we get here some weird cancellation case happened
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except (
|
except TransportClosed as tpt_err:
|
||||||
# XXX, should never really get raised unless we aren't
|
|
||||||
# wrapping them in the below type by mistake?
|
|
||||||
#
|
|
||||||
# Leaving the catch here for now until we're very sure
|
|
||||||
# all the cases (for various tpt protos) have indeed been
|
|
||||||
# re-wrapped ;p
|
|
||||||
trio.ClosedResourceError,
|
|
||||||
trio.BrokenResourceError,
|
|
||||||
|
|
||||||
TransportClosed,
|
|
||||||
) as tpt_err:
|
|
||||||
ipc_borked_report: str = (
|
ipc_borked_report: str = (
|
||||||
f'IPC for actor already closed/broken?\n\n'
|
f'IPC for actor already closed/broken?\n\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
|
|
|
||||||
|
|
@ -524,7 +524,7 @@ async def open_root_actor(
|
||||||
# ?TODO, per-OS non-network-proto alt options?
|
# ?TODO, per-OS non-network-proto alt options?
|
||||||
# -[ ] on linux we should be able to always use UDS?
|
# -[ ] on linux we should be able to always use UDS?
|
||||||
#
|
#
|
||||||
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
|
raddrs: list[UnwrappedAddress] = _state._runtime_vars['_root_addrs']
|
||||||
raddrs.extend(
|
raddrs.extend(
|
||||||
accept_addrs,
|
accept_addrs,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -284,6 +284,15 @@ async def _errors_relayed_via_ipc(
|
||||||
try:
|
try:
|
||||||
yield # run RPC invoke body
|
yield # run RPC invoke body
|
||||||
|
|
||||||
|
# NOTE, never REPL any pseudo-expected tpt-disconnect.
|
||||||
|
except TransportClosed as err:
|
||||||
|
rpc_err = err
|
||||||
|
log.warning(
|
||||||
|
f'Tpt disconnect during remote-exc relay due to,\n'
|
||||||
|
f'{err!r}\n'
|
||||||
|
)
|
||||||
|
raise err
|
||||||
|
|
||||||
# box and ship RPC errors for wire-transit via
|
# box and ship RPC errors for wire-transit via
|
||||||
# the task's requesting parent IPC-channel.
|
# the task's requesting parent IPC-channel.
|
||||||
except (
|
except (
|
||||||
|
|
@ -327,10 +336,15 @@ async def _errors_relayed_via_ipc(
|
||||||
# recovery logic - the only case is some kind of
|
# recovery logic - the only case is some kind of
|
||||||
# strange bug in our transport layer itself? Going
|
# strange bug in our transport layer itself? Going
|
||||||
# to keep this open ended for now.
|
# to keep this open ended for now.
|
||||||
log.debug(
|
|
||||||
'RPC task crashed, attempting to enter debugger\n'
|
if _state.debug_mode():
|
||||||
f'|_{ctx}'
|
log.exception(
|
||||||
)
|
f'RPC task crashed!\n'
|
||||||
|
f'Attempting to enter debugger\n'
|
||||||
|
f'\n'
|
||||||
|
f'{ctx}'
|
||||||
|
)
|
||||||
|
|
||||||
entered_debug = await debug._maybe_enter_pm(
|
entered_debug = await debug._maybe_enter_pm(
|
||||||
err,
|
err,
|
||||||
api_frame=inspect.currentframe(),
|
api_frame=inspect.currentframe(),
|
||||||
|
|
@ -419,7 +433,7 @@ async def _errors_relayed_via_ipc(
|
||||||
# cancel scope will not have been inserted yet
|
# cancel scope will not have been inserted yet
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
log.warning(
|
log.warning(
|
||||||
'RPC task likely errored or cancelled before start?\n'
|
'RPC task likely crashed or cancelled before start?\n'
|
||||||
f'|_{ctx._task}\n'
|
f'|_{ctx._task}\n'
|
||||||
f' >> {ctx.repr_rpc}\n'
|
f' >> {ctx.repr_rpc}\n'
|
||||||
)
|
)
|
||||||
|
|
@ -862,9 +876,9 @@ async def _invoke(
|
||||||
)
|
)
|
||||||
|
|
||||||
logmeth(
|
logmeth(
|
||||||
f'{message}\n'
|
f'{message}'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'{descr_str}\n'
|
f'{descr_str}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -900,6 +914,11 @@ async def try_ship_error_to_remote(
|
||||||
|
|
||||||
# XXX NOTE XXX in SC terms this is one of the worst things
|
# XXX NOTE XXX in SC terms this is one of the worst things
|
||||||
# that can happen and provides for a 2-general's dilemma..
|
# that can happen and provides for a 2-general's dilemma..
|
||||||
|
#
|
||||||
|
# FURHTER, we should never really have to handle these
|
||||||
|
# lowlevel excs from `trio` since the `Channel.send()` layers
|
||||||
|
# downward should be mostly wrapping such cases in a
|
||||||
|
# tpt-closed; the `.critical()` usage is warranted.
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
|
|
||||||
|
|
@ -183,6 +183,14 @@ class Actor:
|
||||||
def is_registrar(self) -> bool:
|
def is_registrar(self) -> bool:
|
||||||
return self.is_arbiter
|
return self.is_arbiter
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_root(self) -> bool:
|
||||||
|
'''
|
||||||
|
This actor is the parent most in the tree?
|
||||||
|
|
||||||
|
'''
|
||||||
|
return _state.is_root_process()
|
||||||
|
|
||||||
msg_buffer_size: int = 2**6
|
msg_buffer_size: int = 2**6
|
||||||
|
|
||||||
# nursery placeholders filled in by `async_main()`,
|
# nursery placeholders filled in by `async_main()`,
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ import trio
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .trionics import (
|
from .trionics import (
|
||||||
|
|
@ -409,10 +410,8 @@ class MsgStream(trio.abc.Channel):
|
||||||
# it).
|
# it).
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await self._ctx.send_stop()
|
await self._ctx.send_stop()
|
||||||
|
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
TransportClosed,
|
||||||
trio.ClosedResourceError
|
|
||||||
) as re:
|
) as re:
|
||||||
# the underlying channel may already have been pulled
|
# the underlying channel may already have been pulled
|
||||||
# in which case our stop message is meaningless since
|
# in which case our stop message is meaningless since
|
||||||
|
|
@ -593,9 +592,8 @@ class MsgStream(trio.abc.Channel):
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
|
||||||
trio.BrokenResourceError,
|
|
||||||
BrokenPipeError,
|
BrokenPipeError,
|
||||||
|
TransportClosed,
|
||||||
) as _trans_err:
|
) as _trans_err:
|
||||||
trans_err = _trans_err
|
trans_err = _trans_err
|
||||||
if (
|
if (
|
||||||
|
|
|
||||||
|
|
@ -1257,3 +1257,26 @@ async def breakpoint(
|
||||||
api_frame=inspect.currentframe(),
|
api_frame=inspect.currentframe(),
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def maybe_pause_bp():
|
||||||
|
'''
|
||||||
|
Internal (ONLY for now) `breakpoint()`-er fn which only tries to
|
||||||
|
use the multi-actor `.pause()` API when the current actor is the
|
||||||
|
root.
|
||||||
|
|
||||||
|
?! BUT WHY !?
|
||||||
|
-------
|
||||||
|
|
||||||
|
This is useful when debugging cases where the tpt layer breaks
|
||||||
|
(or is intentionally broken, say during resiliency testing) in
|
||||||
|
the case where a child can no longer contact the root process to
|
||||||
|
acquire the process-tree-singleton TTY lock.
|
||||||
|
|
||||||
|
'''
|
||||||
|
import tractor
|
||||||
|
actor = tractor.current_actor()
|
||||||
|
if actor.aid.name == 'root':
|
||||||
|
await tractor.pause(shield=True)
|
||||||
|
else:
|
||||||
|
tractor.devx.mk_pdb().set_trace()
|
||||||
|
|
|
||||||
|
|
@ -307,7 +307,12 @@ class Channel:
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Send a coded msg-blob over the transport.
|
Send a coded msg-blob over the underlying IPC transport.
|
||||||
|
|
||||||
|
This fn raises `TransportClosed` on comms failures and is
|
||||||
|
normally handled by higher level runtime machinery for the
|
||||||
|
expected-graceful cases, normally ephemercal
|
||||||
|
(re/dis)connects.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
@ -334,9 +339,10 @@ class Channel:
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise err
|
raise err
|
||||||
case TransportClosed():
|
case TransportClosed():
|
||||||
|
src_exc_str: str = err.repr_src_exc()
|
||||||
log.transport(
|
log.transport(
|
||||||
f'Transport stream closed due to\n'
|
f'Transport stream closed due to,\n'
|
||||||
f'{err.repr_src_exc()}\n'
|
f'{src_exc_str}'
|
||||||
)
|
)
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
|
|
@ -345,6 +351,11 @@ class Channel:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
|
'''
|
||||||
|
Receive the latest (queued) msg-blob from the underlying IPC
|
||||||
|
transport.
|
||||||
|
|
||||||
|
'''
|
||||||
assert self._transport
|
assert self._transport
|
||||||
return await self._transport.recv()
|
return await self._transport.recv()
|
||||||
|
|
||||||
|
|
@ -418,16 +429,18 @@ class Channel:
|
||||||
self
|
self
|
||||||
) -> AsyncGenerator[Any, None]:
|
) -> AsyncGenerator[Any, None]:
|
||||||
'''
|
'''
|
||||||
Yield `MsgType` IPC msgs decoded and deliverd from
|
Yield `MsgType` IPC msgs decoded and deliverd from an
|
||||||
an underlying `MsgTransport` protocol.
|
underlying `MsgTransport` protocol.
|
||||||
|
|
||||||
This is a streaming routine alo implemented as an async-gen
|
This is a streaming routine alo implemented as an
|
||||||
func (same a `MsgTransport._iter_pkts()`) gets allocated by
|
async-generator func (same a `MsgTransport._iter_pkts()`)
|
||||||
a `.__call__()` inside `.__init__()` where it is assigned to
|
gets allocated by a `.__call__()` inside `.__init__()` where
|
||||||
the `._aiter_msgs` attr.
|
it is assigned to the `._aiter_msgs` attr.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
assert self._transport
|
if not self._transport:
|
||||||
|
raise RuntimeError('No IPC transport initialized!?')
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
async for msg in self._transport:
|
async for msg in self._transport:
|
||||||
|
|
@ -462,7 +475,15 @@ class Channel:
|
||||||
# continue
|
# continue
|
||||||
|
|
||||||
def connected(self) -> bool:
|
def connected(self) -> bool:
|
||||||
return self._transport.connected() if self._transport else False
|
'''
|
||||||
|
Predicate whether underlying IPC tpt is connected.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return (
|
||||||
|
self._transport.connected()
|
||||||
|
if self._transport
|
||||||
|
else False
|
||||||
|
)
|
||||||
|
|
||||||
async def _do_handshake(
|
async def _do_handshake(
|
||||||
self,
|
self,
|
||||||
|
|
@ -493,8 +514,11 @@ async def _connect_chan(
|
||||||
addr: UnwrappedAddress
|
addr: UnwrappedAddress
|
||||||
) -> typing.AsyncGenerator[Channel, None]:
|
) -> typing.AsyncGenerator[Channel, None]:
|
||||||
'''
|
'''
|
||||||
Create and connect a channel with disconnect on context manager
|
Create and connect a `Channel` to the provided `addr`, disconnect
|
||||||
teardown.
|
it on cm exit.
|
||||||
|
|
||||||
|
NOTE, this is a lowlevel, normally internal-only iface. You
|
||||||
|
should likely use `.open_portal()` instead.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
chan = await Channel.from_addr(addr)
|
chan = await Channel.from_addr(addr)
|
||||||
|
|
|
||||||
|
|
@ -154,7 +154,6 @@ class MsgTransport(Protocol):
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class MsgpackTransport(MsgTransport):
|
class MsgpackTransport(MsgTransport):
|
||||||
|
|
||||||
# TODO: better naming for this?
|
# TODO: better naming for this?
|
||||||
|
|
@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport):
|
||||||
except trio.ClosedResourceError as cre:
|
except trio.ClosedResourceError as cre:
|
||||||
closure_err = cre
|
closure_err = cre
|
||||||
|
|
||||||
|
# await tractor.devx._trace.maybe_pause_bp()
|
||||||
|
|
||||||
raise TransportClosed(
|
raise TransportClosed(
|
||||||
message=(
|
message=(
|
||||||
f'{tpt_name} was already closed locally ?\n'
|
f'{tpt_name} was already closed locally?'
|
||||||
),
|
),
|
||||||
src_exc=closure_err,
|
src_exc=closure_err,
|
||||||
loglevel='error',
|
loglevel='error',
|
||||||
raise_on_report=(
|
raise_on_report=(
|
||||||
'another task closed this fd' in closure_err.args
|
'another task closed this fd'
|
||||||
|
in
|
||||||
|
closure_err.args
|
||||||
),
|
),
|
||||||
) from closure_err
|
) from closure_err
|
||||||
|
|
||||||
|
|
@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport):
|
||||||
trans_err = _re
|
trans_err = _re
|
||||||
tpt_name: str = f'{type(self).__name__!r}'
|
tpt_name: str = f'{type(self).__name__!r}'
|
||||||
|
|
||||||
|
trans_err_msg: str = trans_err.args[0]
|
||||||
|
by_whom: str = {
|
||||||
|
'another task closed this fd': 'locally',
|
||||||
|
'this socket was already closed': 'by peer',
|
||||||
|
}.get(trans_err_msg)
|
||||||
match trans_err:
|
match trans_err:
|
||||||
|
|
||||||
# XXX, specifc to UDS transport and its,
|
# XXX, specifc to UDS transport and its,
|
||||||
|
|
@ -446,38 +454,42 @@ class MsgpackTransport(MsgTransport):
|
||||||
case trio.BrokenResourceError() if (
|
case trio.BrokenResourceError() if (
|
||||||
'[Errno 32] Broken pipe'
|
'[Errno 32] Broken pipe'
|
||||||
in
|
in
|
||||||
trans_err.args[0]
|
trans_err_msg
|
||||||
):
|
):
|
||||||
tpt_closed = TransportClosed.from_src_exc(
|
tpt_closed = TransportClosed.from_src_exc(
|
||||||
message=(
|
message=(
|
||||||
f'{tpt_name} already closed by peer\n'
|
f'{tpt_name} already closed by peer\n'
|
||||||
),
|
),
|
||||||
body=f'{self}\n',
|
body=f'{self}',
|
||||||
src_exc=trans_err,
|
src_exc=trans_err,
|
||||||
raise_on_report=True,
|
raise_on_report=True,
|
||||||
loglevel='transport',
|
loglevel='transport',
|
||||||
)
|
)
|
||||||
raise tpt_closed from trans_err
|
raise tpt_closed from trans_err
|
||||||
|
|
||||||
# case trio.ClosedResourceError() if (
|
# ??TODO??, what case in piker does this and HOW
|
||||||
# 'this socket was already closed'
|
# CAN WE RE-PRODUCE IT?!?!?
|
||||||
# in
|
case trio.ClosedResourceError() if (
|
||||||
# trans_err.args[0]
|
by_whom
|
||||||
# ):
|
):
|
||||||
# tpt_closed = TransportClosed.from_src_exc(
|
tpt_closed = TransportClosed.from_src_exc(
|
||||||
# message=(
|
message=(
|
||||||
# f'{tpt_name} already closed by peer\n'
|
f'{tpt_name} was already closed {by_whom!r}?\n'
|
||||||
# ),
|
),
|
||||||
# body=f'{self}\n',
|
body=f'{self}',
|
||||||
# src_exc=trans_err,
|
src_exc=trans_err,
|
||||||
# raise_on_report=True,
|
raise_on_report=True,
|
||||||
# loglevel='transport',
|
loglevel='transport',
|
||||||
# )
|
)
|
||||||
# raise tpt_closed from trans_err
|
|
||||||
|
|
||||||
# unless the disconnect condition falls under "a
|
# await tractor.devx._trace.maybe_pause_bp()
|
||||||
# normal operation breakage" we usualy console warn
|
raise tpt_closed from trans_err
|
||||||
# about it.
|
|
||||||
|
# XXX, unless the disconnect condition falls
|
||||||
|
# under "a normal/expected operating breakage"
|
||||||
|
# (per the `trans_err_msg` guards in the cases
|
||||||
|
# above) we usualy console-error about it and
|
||||||
|
# raise-thru. about it.
|
||||||
case _:
|
case _:
|
||||||
log.exception(
|
log.exception(
|
||||||
f'{tpt_name} layer failed pre-send ??\n'
|
f'{tpt_name} layer failed pre-send ??\n'
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue