Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet 58cc57a422 Move `Portal.open_context()` impl to `._context`
Finally, since normally you need the content from `._context.Context`
and surroundings in order to effectively grok `Portal.open_context()`
anyways, might as well move the impl to the ctx module as
`open_context_from_portal()` and just bind it on the `Portal` class def.

Associated/required tweaks:
- avoid circ import on `.devx` by only import
  `.maybe_wait_for_debugger()` when debug mode is set.
- drop `async_generator` usage, not sure why this hadn't already been
  changed to `contextlib`?
- use `@acm` alias throughout `._portal`
2024-03-13 12:09:38 -04:00
Tyler Goodlet da913ef2bb Attempt at better internal traceback hiding
Previously i was trying to approach this using lots of
`__tracebackhide__`'s in various internal funcs but since it's not
exactly straight forward to do this inside core deps like `trio` and the
stdlib, it makes a bit more sense to optionally catch and re-raise
certain classes of errors from their originals using `raise from` syntax
as per:
https://docs.python.org/3/library/exceptions.html#exception-context

Deats:
- litter `._context` methods with `__tracebackhide__`/`hide_tb` which
  were previously being shown but that don't need to be to application
  code now that cancel semantics testing is finished up.
- i originally did the same but later commented it all out in `._ipc`
  since error catch and re-raise instead in higher level layers
  (above the transport) seems to be a much saner approach.
- add catch-n-reraise-from in `MsgStream.send()`/.`receive()` to avoid
  seeing the depths of `trio` and/or our `._ipc` layers on comms errors.

Further this patch adds some refactoring to use the
same remote-error shipper routine from both the actor-core in the RPC
invoker:
- rename it as `try_ship_error_to_remote()` and call it from
  `._invoke()` as well as it's prior usage.
- make it optionally accept `cid: str` a `remote_descr: str` and of
  course a `hide_tb: bool`.

Other misc tweaks:
- add some todo notes around `Actor.load_modules()` debug hooking.
- tweak the zombie reaper log msg and timeout value ;)
2024-03-13 10:44:51 -04:00
Tyler Goodlet 96992bcbb9 Add (back) a `tractor._testing` sub-pkg
Since importing from our top level `conftest.py` is not scaleable
or as "future forward thinking" in terms of:
- LoC-wise (it's only one file),
- prevents "external" (aka non-test) example scripts from importing
  content easily,
- seemingly(?) can't be used via abs-import if using
  a `[tool.pytest.ini_options]` in a `pyproject.toml` vs.
  a `pytest.ini`, see:
  https://docs.pytest.org/en/8.0.x/reference/customize.html#pyproject-toml)

=> Go back to having an internal "testing" pkg like `trio` (kinda) does.

Deats:
- move generic top level helpers into pkg-mod including the new
  `expect_ctxc()` (which i needed in the advanced faults testing script.
- move `@tractor_test` into `._testing.pytest` sub-mod.
- adjust all the helper imports to be a `from tractor._testing import <..>`

Rework `test_ipc_channel_break_during_stream()` and backing script:
- make test(s) pull `debug_mode` from new fixture (which is now
  controlled manually from `--tpdb` flag) and drop the previous
  parametrized input.
- update logic in ^ test for "which-side-fails" cases to better match
  recently updated/stricter cancel/failure semantics in terms of
  `ClosedResouruceError` vs. `EndOfChannel` expectations.
- handle `ExceptionGroup`s with expected embedded errors in test.
- better pendantics around whether to expect a user simulated KBI.
- for `examples/advanced_faults/ipc_failure_during_stream.py` script:
  - generalize ipc breakage in new `break_ipc()` with support for diff
    internal `trio` methods and a #TODO for future disti frameworks
  - only make one sub-actor task break and the other just stream.
  - use new `._testing.expect_ctxc()` around ctx block.
  - add a bit of exception handling with `print()`s around ctxc (unused
    except if 'msg' break method is set) and eoc cases.
  - don't break parent side ipc in loop any more then once
    after first break, checked via flag var.
  - add a `pre_close: bool` flag to control whether
    `MsgStreama.aclose()` is called *before* any ipc breakage method.

Still TODO:
- drop `pytest.ini` and add the alt section to `pyproject.py`.
 -> currently can't get `--rootdir=` opt to work.. not showing in
   console header.
 -> ^ also breaks on 'tests' `enable_modules` imports in subactors
   during discovery tests?
2024-03-13 09:09:08 -04:00
Tyler Goodlet 6533285d7d Add `an: ActorNursery` var placeholder for final log msg 2024-03-12 08:56:17 -04:00
27 changed files with 1246 additions and 888 deletions

View File

@ -6,40 +6,112 @@ been an outage) and we want to ensure that despite being in debug mode
actor tree will eventually be cancelled without leaving any zombies. actor tree will eventually be cancelled without leaving any zombies.
''' '''
import trio from functools import partial
from tractor import ( from tractor import (
open_nursery, open_nursery,
context, context,
Context, Context,
ContextCancelled,
MsgStream, MsgStream,
_testing,
)
import trio
async def break_ipc(
stream: MsgStream,
method: str|None = None,
pre_close: bool = False,
def_method: str = 'eof',
) -> None:
'''
XXX: close the channel right after an error is raised
purposely breaking the IPC transport to make sure the parent
doesn't get stuck in debug or hang on the connection join.
this more or less simulates an infinite msg-receive hang on
the other end.
'''
# close channel via IPC prot msging before
# any transport breakage
if pre_close:
await stream.aclose()
method: str = method or def_method
match method:
case 'trans_aclose':
await stream._ctx.chan.transport.stream.aclose()
case 'eof':
await stream._ctx.chan.transport.stream.send_eof()
case 'msg':
await stream._ctx.chan.send(None)
# TODO: the actual real-world simulated cases like
# transport layer hangs and/or lower layer 2-gens type
# scenarios..
#
# -[ ] already have some issues for this general testing
# area:
# - https://github.com/goodboy/tractor/issues/97
# - https://github.com/goodboy/tractor/issues/124
# - PR from @guille:
# https://github.com/goodboy/tractor/pull/149
# case 'hang':
# TODO: framework research:
#
# - https://github.com/GuoTengda1993/pynetem
# - https://github.com/shopify/toxiproxy
# - https://manpages.ubuntu.com/manpages/trusty/man1/wirefilter.1.html
case _:
raise RuntimeError(
f'IPC break method unsupported: {method}'
) )
async def break_channel_silently_then_error( async def break_ipc_then_error(
stream: MsgStream, stream: MsgStream,
break_ipc_with: str|None = None,
pre_close: bool = False,
): ):
async for msg in stream: async for msg in stream:
await stream.send(msg) await stream.send(msg)
await break_ipc(
# XXX: close the channel right after an error is raised stream=stream,
# purposely breaking the IPC transport to make sure the parent method=break_ipc_with,
# doesn't get stuck in debug or hang on the connection join. pre_close=pre_close,
# this more or less simulates an infinite msg-receive hang on )
# the other end.
await stream._ctx.chan.send(None)
assert 0 assert 0
async def close_stream_and_error( # async def close_stream_and_error(
async def iter_ipc_stream(
stream: MsgStream, stream: MsgStream,
break_ipc_with: str|None = None,
pre_close: bool = False,
): ):
async for msg in stream: async for msg in stream:
await stream.send(msg) await stream.send(msg)
# wipe out channel right before raising # wipe out channel right before raising
await stream._ctx.chan.send(None) # await break_ipc(
await stream.aclose() # stream=stream,
assert 0 # method=break_ipc_with,
# pre_close=pre_close,
# )
# send channel close msg at SC-prot level
#
# TODO: what should get raised here if anything?
# await stream.aclose()
# assert 0
@context @context
@ -47,6 +119,7 @@ async def recv_and_spawn_net_killers(
ctx: Context, ctx: Context,
break_ipc_after: bool|int = False, break_ipc_after: bool|int = False,
pre_close: bool = False,
) -> None: ) -> None:
''' '''
@ -63,27 +136,42 @@ async def recv_and_spawn_net_killers(
await stream.send(i) await stream.send(i)
if ( if (
break_ipc_after break_ipc_after
and i > break_ipc_after and
i > break_ipc_after
): ):
'#################################\n' '#################################\n'
'Simulating child-side IPC BREAK!\n' 'Simulating CHILD-side IPC BREAK!\n'
'#################################' '#################################\n'
n.start_soon(break_channel_silently_then_error, stream) n.start_soon(
n.start_soon(close_stream_and_error, stream) partial(
break_ipc_then_error,
stream=stream,
pre_close=pre_close,
)
)
n.start_soon(
iter_ipc_stream,
stream,
)
async def main( async def main(
debug_mode: bool = False, debug_mode: bool = False,
start_method: str = 'trio', start_method: str = 'trio',
loglevel: str = 'cancel',
# by default we break the parent IPC first (if configured to break # by default we break the parent IPC first (if configured to break
# at all), but this can be changed so the child does first (even if # at all), but this can be changed so the child does first (even if
# both are set to break). # both are set to break).
break_parent_ipc_after: int|bool = False, break_parent_ipc_after: int|bool = False,
break_child_ipc_after: int|bool = False, break_child_ipc_after: int|bool = False,
pre_close: bool = False,
) -> None: ) -> None:
# from tractor._state import _runtime_vars as rtv
# rtv['_debug_mode'] = debug_mode
async with ( async with (
open_nursery( open_nursery(
start_method=start_method, start_method=start_method,
@ -91,57 +179,107 @@ async def main(
# NOTE: even debugger is used we shouldn't get # NOTE: even debugger is used we shouldn't get
# a hang since it never engages due to broken IPC # a hang since it never engages due to broken IPC
debug_mode=debug_mode, debug_mode=debug_mode,
loglevel='warning', loglevel=loglevel,
) as an, ) as an,
): ):
sub_name: str = 'chitty_hijo'
portal = await an.start_actor( portal = await an.start_actor(
'chitty_hijo', sub_name,
enable_modules=[__name__], enable_modules=[__name__],
) )
async with portal.open_context( async with (
_testing.expect_ctxc(
yay=(
break_parent_ipc_after
or break_child_ipc_after,
),
# TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either
# the stream API after the `None` msg is sent
# (which actually implicitly cancels all remote
# tasks in the hijo) or from simluated
# KBI-mash-from-user
# or should we expect that a KBI triggers the ctxc
# and KBI in an eg?
reraise=True,
),
portal.open_context(
recv_and_spawn_net_killers, recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after, break_ipc_after=break_child_ipc_after,
pre_close=pre_close,
) as (ctx, sent): ) as (ctx, sent),
):
ipc_break_sent: bool = False
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
for i in range(1000): for i in range(1000):
if ( if (
break_parent_ipc_after break_parent_ipc_after
and i > break_parent_ipc_after and
i > break_parent_ipc_after
and
not ipc_break_sent
): ):
print( print(
'#################################\n' '#################################\n'
'Simulating parent-side IPC BREAK!\n' 'Simulating PARENT-side IPC BREAK!\n'
'#################################' '#################################\n'
) )
await stream._ctx.chan.send(None)
# await stream._ctx.chan.send(None)
# await stream._ctx.chan.transport.stream.send_eof()
await stream._ctx.chan.transport.stream.aclose()
ipc_break_sent = True
# it actually breaks right here in the # it actually breaks right here in the
# mp_spawn/forkserver backends and thus the zombie # mp_spawn/forkserver backends and thus the zombie
# reaper never even kicks in? # reaper never even kicks in?
print(f'parent sending {i}') print(f'parent sending {i}')
try:
await stream.send(i) await stream.send(i)
except ContextCancelled as ctxc:
print(
'parent received ctxc on `stream.send()`\n'
f'{ctxc}\n'
)
assert 'root' in ctxc.canceller
assert sub_name in ctx.canceller
with trio.move_on_after(2) as cs: # TODO: is this needed or no?
raise
timeout: int = 1
print(f'Entering `stream.receive()` with timeout={timeout}\n')
with trio.move_on_after(timeout) as cs:
# NOTE: in the parent side IPC failure case this # NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child # will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's # is killed and sends a stop msg back to it's
# caller/this-parent. # caller/this-parent.
try:
rx = await stream.receive() rx = await stream.receive()
print(
print(f"I'm a happy user and echoed to me is {rx}") "I'm a happy PARENT user and echoed to me is\n"
f'{rx}\n'
)
except trio.EndOfChannel:
print('MsgStream got EoC for PARENT')
raise
if cs.cancelled_caught: if cs.cancelled_caught:
# pretend to be a user seeing no streaming action # pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c.. # thinking it's a hang, and then hitting ctl-c..
print("YOO i'm a user anddd thingz hangin..") print(
f"YOO i'm a PARENT user anddd thingz hangin..\n"
f'after timeout={timeout}\n'
)
print( print(
"YOO i'm mad send side dun but thingz hangin..\n" "YOO i'm mad!\n"
'The send side is dun but thingz hangin..\n'
'MASHING CTlR-C Ctl-c..' 'MASHING CTlR-C Ctl-c..'
) )
raise KeyboardInterrupt raise KeyboardInterrupt

View File

@ -26,3 +26,23 @@ all_bullets = true
directory = "trivial" directory = "trivial"
name = "Trivial/Internal Changes" name = "Trivial/Internal Changes"
showcontent = true showcontent = true
[tool.pytest.ini_options]
minversion = '6.0'
testpaths = [
'tests'
]
addopts = [
# TODO: figure out why this isn't working..
'--rootdir=./tests',
'--import-mode=importlib',
# don't show frickin captured logs AGAIN in the report..
'--show-capture=no',
]
log_cli = false
# TODO: maybe some of these layout choices?
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
# pythonpath = "src"

View File

@ -1,8 +0,0 @@
# vim: ft=conf
# pytest.ini for tractor
[pytest]
# don't show frickin captured logs AGAIN in the report..
addopts = --show-capture='no'
log_cli = false
; minversion = 6.0

View File

@ -1,105 +1,25 @@
""" """
``tractor`` testing!! ``tractor`` testing!!
""" """
from contextlib import asynccontextmanager as acm
import sys import sys
import subprocess import subprocess
import os import os
import random import random
import signal import signal
import platform import platform
import pathlib
import time import time
import inspect
from functools import partial, wraps
import pytest import pytest
import trio
import tractor import tractor
from tractor._testing import (
examples_dir as examples_dir,
tractor_test as tractor_test,
expect_ctxc as expect_ctxc,
)
# TODO: include wtv plugin(s) we build in `._testing.pytest`?
pytest_plugins = ['pytester'] pytest_plugins = ['pytester']
def tractor_test(fn):
"""
Use:
@tractor_test
async def test_whatever():
await ...
If fixtures:
- ``reg_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)
are defined in the `pytest` fixture space they will be automatically
injected to tests declaring these funcargs.
"""
@wraps(fn)
def wrapper(
*args,
loglevel=None,
reg_addr=None,
start_method: str|None = None,
debug_mode: bool = False,
**kwargs
):
# __tracebackhide__ = True
# NOTE: inject ant test func declared fixture
# names by manually checking!
if 'reg_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
kwargs['reg_addr'] = reg_addr
if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['loglevel'] = loglevel
if start_method is None:
if platform.system() == "Windows":
start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['start_method'] = start_method
if 'debug_mode' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['debug_mode'] = debug_mode
if kwargs:
# use explicit root actor start
async def _main():
async with tractor.open_root_actor(
# **kwargs,
registry_addrs=[reg_addr] if reg_addr else None,
loglevel=loglevel,
start_method=start_method,
# TODO: only enable when pytest is passed --pdb
debug_mode=debug_mode,
):
await fn(*args, **kwargs)
main = _main
else:
# use implicit root actor start
main = partial(fn, *args, **kwargs)
return trio.run(main)
return wrapper
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives # Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows': if platform.system() == 'Windows':
_KILL_SIGNAL = signal.CTRL_BREAK_EVENT _KILL_SIGNAL = signal.CTRL_BREAK_EVENT
@ -119,23 +39,6 @@ no_windows = pytest.mark.skipif(
) )
def repodir() -> pathlib.Path:
'''
Return the abspath to the repo directory.
'''
# 2 parents up to step up through tests/<repo_dir>
return pathlib.Path(__file__).parent.parent.absolute()
def examples_dir() -> pathlib.Path:
'''
Return the abspath to the examples directory as `pathlib.Path`.
'''
return repodir() / 'examples'
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption( parser.addoption(
"--ll", "--ll",
@ -194,11 +97,18 @@ _ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def ci_env() -> bool: def ci_env() -> bool:
"""Detect CI envoirment. '''
""" Detect CI envoirment.
'''
return _ci_env return _ci_env
# TODO: also move this to `._testing` for now?
# -[ ] possibly generalize and re-use for multi-tree spawning
# along with the new stuff for multi-addrs in distribute_dis
# branch?
#
# choose randomly at import time # choose randomly at import time
_reg_addr: tuple[str, int] = ( _reg_addr: tuple[str, int] = (
'127.0.0.1', '127.0.0.1',
@ -252,6 +162,7 @@ def sig_prog(proc, sig):
assert ret assert ret
# TODO: factor into @cm and move to `._testing`?
@pytest.fixture @pytest.fixture
def daemon( def daemon(
loglevel: str, loglevel: str,
@ -293,26 +204,3 @@ def daemon(
time.sleep(_PROC_SPAWN_WAIT) time.sleep(_PROC_SPAWN_WAIT)
yield proc yield proc
sig_prog(proc, _INT_SIGNAL) sig_prog(proc, _INT_SIGNAL)
@acm
async def expect_ctxc(
yay: bool,
reraise: bool = False,
) -> None:
'''
Small acm to catch `ContextCancelled` errors when expected
below it in a `async with ()` block.
'''
if yay:
try:
yield
raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled:
if reraise:
raise
else:
return
else:
yield

View File

@ -3,24 +3,28 @@ Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la
cancelacion?.. cancelacion?..
''' '''
import itertools
from functools import partial from functools import partial
from types import ModuleType
import pytest import pytest
from _pytest.pathlib import import_path from _pytest.pathlib import import_path
import trio import trio
import tractor import tractor
from tractor._testing import (
from conftest import (
examples_dir, examples_dir,
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'debug_mode', 'pre_aclose_msgstream',
[False, True], [
False,
True,
],
ids=[ ids=[
'no_debug_mode', 'no_msgstream_aclose',
'debug_mode', 'pre_aclose_msgstream',
], ],
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -66,8 +70,10 @@ from conftest import (
) )
def test_ipc_channel_break_during_stream( def test_ipc_channel_break_during_stream(
debug_mode: bool, debug_mode: bool,
loglevel: str,
spawn_backend: str, spawn_backend: str,
ipc_break: dict|None, ipc_break: dict|None,
pre_aclose_msgstream: bool,
): ):
''' '''
Ensure we can have an IPC channel break its connection during Ensure we can have an IPC channel break its connection during
@ -79,77 +85,123 @@ def test_ipc_channel_break_during_stream(
''' '''
if spawn_backend != 'trio': if spawn_backend != 'trio':
if debug_mode: # if debug_mode:
pytest.skip('`debug_mode` only supported on `trio` spawner') # pytest.skip('`debug_mode` only supported on `trio` spawner')
# non-`trio` spawners should never hit the hang condition that # non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree. # requires the user to do ctl-c to cancel the actor tree.
expect_final_exc = trio.ClosedResourceError expect_final_exc = trio.ClosedResourceError
mod = import_path( mod: ModuleType = import_path(
examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py', examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py',
root=examples_dir(), root=examples_dir(),
) )
expect_final_exc = KeyboardInterrupt # by def we expect KBI from user after a simulated "hang
# period" wherein the user eventually hits ctl-c to kill the
# when ONLY the child breaks we expect the parent to get a closed # root-actor tree.
# resource error on the next `MsgStream.receive()` and then fail out expect_final_exc: BaseException = KeyboardInterrupt
# and cancel the child from there.
if ( if (
# only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False
# AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream
):
expect_final_exc = trio.EndOfChannel
# only child breaks # NOTE when ONLY the child breaks or it breaks BEFORE the
( # parent we expect the parent to get a closed resource error
# on the next `MsgStream.receive()` and then fail out and
# cancel the child from there.
#
# ONLY CHILD breaks
if (
ipc_break['break_child_ipc_after'] ipc_break['break_child_ipc_after']
and ipc_break['break_parent_ipc_after'] is False and
) ipc_break['break_parent_ipc_after'] is False
):
expect_final_exc = trio.ClosedResourceError
# both break but, parent breaks first # if child calls `MsgStream.aclose()` then expect EoC.
or ( if pre_aclose_msgstream:
expect_final_exc = trio.EndOfChannel
# BOTH but, CHILD breaks FIRST
elif (
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
and ( and (
ipc_break['break_parent_ipc_after'] ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after'] > ipc_break['break_child_ipc_after']
) )
)
): ):
expect_final_exc = trio.ClosedResourceError expect_final_exc = trio.ClosedResourceError
# when the parent IPC side dies (even if the child's does as well # child will send a 'stop' msg before it breaks
# but the child fails BEFORE the parent) we expect the channel to be # the transport channel.
# sent a stop msg from the child at some point which will signal the if pre_aclose_msgstream:
# parent that the stream has been terminated. expect_final_exc = trio.EndOfChannel
# NOTE: when the parent breaks "after" the child you get this same
# case as well, the child breaks the IPC channel with a stop msg
# before any closure takes place.
elif (
# only parent breaks
(
ipc_break['break_parent_ipc_after']
and ipc_break['break_child_ipc_after'] is False
)
# both break but, child breaks first # NOTE when the parent IPC side dies (even if the child's does as well
or ( # but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect
# a stop msg since the parent-side ctx apis will error out
# IMMEDIATELY before the child ever sends any 'stop' msg.
#
# ONLY PARENT breaks
elif (
ipc_break['break_parent_ipc_after']
and
ipc_break['break_child_ipc_after'] is False
):
expect_final_exc = trio.ClosedResourceError
# BOTH but, PARENT breaks FIRST
elif (
ipc_break['break_parent_ipc_after'] is not False ipc_break['break_parent_ipc_after'] is not False
and ( and (
ipc_break['break_child_ipc_after'] ipc_break['break_child_ipc_after']
> ipc_break['break_parent_ipc_after'] > ipc_break['break_parent_ipc_after']
) )
)
): ):
expect_final_exc = trio.EndOfChannel expect_final_exc = trio.ClosedResourceError
with pytest.raises(expect_final_exc): with pytest.raises(
expected_exception=(
expect_final_exc,
ExceptionGroup,
),
) as excinfo:
try:
trio.run( trio.run(
partial( partial(
mod.main, mod.main,
debug_mode=debug_mode, debug_mode=debug_mode,
start_method=spawn_backend, start_method=spawn_backend,
loglevel=loglevel,
pre_close=pre_aclose_msgstream,
**ipc_break, **ipc_break,
) )
) )
except KeyboardInterrupt as kbi:
_err = kbi
if expect_final_exc is not KeyboardInterrupt:
pytest.fail(
'Rxed unexpected KBI !?\n'
f'{repr(kbi)}'
)
raise
# get raw instance from pytest wrapper
value = excinfo.value
if isinstance(value, ExceptionGroup):
value = next(
itertools.dropwhile(
lambda exc: not isinstance(exc, expect_final_exc),
value.exceptions,
)
)
assert value
@tractor.context @tractor.context

View File

@ -15,8 +15,10 @@ from exceptiongroup import (
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor._testing import (
from conftest import tractor_test, no_windows tractor_test,
)
from conftest import no_windows
def is_win(): def is_win():

View File

@ -5,9 +5,7 @@ import trio
import tractor import tractor
from tractor import open_actor_cluster from tractor import open_actor_cluster
from tractor.trionics import gather_contexts from tractor.trionics import gather_contexts
from tractor._testing import tractor_test
from conftest import tractor_test
MESSAGE = 'tractoring at full speed' MESSAGE = 'tractoring at full speed'

View File

@ -25,7 +25,7 @@ from tractor._exceptions import (
ContextCancelled, ContextCancelled,
) )
from conftest import ( from tractor._testing import (
tractor_test, tractor_test,
expect_ctxc, expect_ctxc,
) )

View File

@ -30,8 +30,10 @@ from tractor.devx._debug import (
_pause_msg, _pause_msg,
_crash_msg, _crash_msg,
) )
from conftest import ( from tractor._testing import (
examples_dir, examples_dir,
)
from conftest import (
_ci_env, _ci_env,
) )

View File

@ -9,10 +9,9 @@ import itertools
import pytest import pytest
import tractor import tractor
from tractor._testing import tractor_test
import trio import trio
from conftest import tractor_test
@tractor_test @tractor_test
async def test_reg_then_unreg(reg_addr): async def test_reg_then_unreg(reg_addr):

View File

@ -11,8 +11,7 @@ import platform
import shutil import shutil
import pytest import pytest
from tractor._testing import (
from conftest import (
examples_dir, examples_dir,
) )

View File

@ -18,8 +18,7 @@ from tractor import (
ContextCancelled, ContextCancelled,
) )
from tractor.trionics import BroadcastReceiver from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc
from conftest import expect_ctxc
async def sleep_and_err( async def sleep_and_err(

View File

@ -9,7 +9,7 @@ import trio
import tractor import tractor
import pytest import pytest
from conftest import tractor_test from tractor._testing import tractor_test
def test_must_define_ctx(): def test_must_define_ctx():

View File

@ -7,7 +7,7 @@ import pytest
import trio import trio
import tractor import tractor
from conftest import tractor_test from tractor._testing import tractor_test
@pytest.mark.trio @pytest.mark.trio

View File

@ -7,8 +7,10 @@ import time
import pytest import pytest
import trio import trio
import tractor import tractor
from conftest import ( from tractor._testing import (
tractor_test, tractor_test,
)
from conftest import (
sig_prog, sig_prog,
_INT_SIGNAL, _INT_SIGNAL,
_INT_RETURN_CODE, _INT_RETURN_CODE,

View File

@ -5,8 +5,7 @@ import pytest
import trio import trio
import tractor import tractor
from tractor.experimental import msgpub from tractor.experimental import msgpub
from tractor._testing import tractor_test
from conftest import tractor_test
def test_type_checks(): def test_type_checks():

View File

@ -8,7 +8,7 @@ import pytest
import trio import trio
import tractor import tractor
from conftest import tractor_test from tractor._testing import tractor_test
_file_path: str = '' _file_path: str = ''

View File

@ -8,7 +8,7 @@ import pytest
import trio import trio
import tractor import tractor
from conftest import tractor_test from tractor._testing import tractor_test
data_to_pass_down = {'doggy': 10, 'kitty': 4} data_to_pass_down = {'doggy': 10, 'kitty': 4}

View File

@ -43,7 +43,6 @@ import warnings
import trio import trio
from .msg import NamespacePath
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
InternalError, InternalError,
@ -51,11 +50,16 @@ from ._exceptions import (
StreamOverrun, StreamOverrun,
pack_error, pack_error,
unpack_error, unpack_error,
_raise_from_no_key_in_msg,
) )
from .log import get_logger from .log import get_logger
from .msg import NamespacePath
from ._ipc import Channel from ._ipc import Channel
from ._streaming import MsgStream from ._streaming import MsgStream
from ._state import current_actor from ._state import (
current_actor,
debug_mode,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
@ -1021,6 +1025,8 @@ class Context:
assert self._scope assert self._scope
self._scope.cancel() self._scope.cancel()
# TODO? should we move this to `._streaming` much like we
# moved `Portal.open_context()`'s def to this mod?
@acm @acm
async def open_stream( async def open_stream(
self, self,
@ -1198,8 +1204,12 @@ class Context:
# TODO: replace all the instances of this!! XD # TODO: replace all the instances of this!! XD
def maybe_raise( def maybe_raise(
self, self,
hide_tb: bool = True,
**kwargs, **kwargs,
) -> Exception|None: ) -> Exception|None:
__tracebackhide__: bool = hide_tb
if re := self._remote_error: if re := self._remote_error:
return self._maybe_raise_remote_err( return self._maybe_raise_remote_err(
re, re,
@ -1209,8 +1219,10 @@ class Context:
def _maybe_raise_remote_err( def _maybe_raise_remote_err(
self, self,
remote_error: Exception, remote_error: Exception,
raise_ctxc_from_self_call: bool = False, raise_ctxc_from_self_call: bool = False,
raise_overrun_from_self: bool = True, raise_overrun_from_self: bool = True,
hide_tb: bool = True,
) -> ( ) -> (
ContextCancelled # `.cancel()` request to far side ContextCancelled # `.cancel()` request to far side
@ -1222,6 +1234,7 @@ class Context:
a cancellation (if any). a cancellation (if any).
''' '''
__tracebackhide__: bool = hide_tb
our_uid: tuple = self.chan.uid our_uid: tuple = self.chan.uid
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
@ -1305,7 +1318,7 @@ class Context:
# TODO: change to `.wait_for_result()`? # TODO: change to `.wait_for_result()`?
async def result( async def result(
self, self,
hide_tb: bool = False, hide_tb: bool = True,
) -> Any|Exception: ) -> Any|Exception:
''' '''
@ -1841,6 +1854,541 @@ class Context:
return False return False
@acm
async def open_context_from_portal(
portal: Portal,
func: Callable,
allow_overruns: bool = False,
# TODO: if we set this the wrapping `@acm` body will
# still be shown (awkwardly) on pdb REPL entry. Ideally
# we can similarly annotate that frame to NOT show?
hide_tb: bool = True,
# proxied to RPC
**kwargs,
) -> AsyncGenerator[tuple[Context, Any], None]:
'''
Open an inter-actor "task context"; a remote task is
scheduled and cancel-scope-state-linked to a `trio.run()` across
memory boundaries in another actor's runtime.
This is an `@acm` API bound as `Portal.open_context()` which
allows for deterministic setup and teardown of a remotely
scheduled task in another remote actor. Once opened, the 2 now
"linked" tasks run completely in parallel in each actor's
runtime with their enclosing `trio.CancelScope`s kept in
a synced state wherein if either side errors or cancels an
equivalent error is relayed to the other side via an SC-compat
IPC protocol.
The yielded `tuple` is a pair delivering a `tractor.Context`
and any first value "sent" by the "callee" task via a call
to `Context.started(<value: Any>)`; this side of the
context does not unblock until the "callee" task calls
`.started()` in similar style to `trio.Nursery.start()`.
When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be
retrieved using the `Contex.result()` api.
The yielded ``Context`` instance further allows for opening
bidirectional streams, explicit cancellation and
structurred-concurrency-synchronized final result-msg
collection. See ``tractor.Context`` for more details.
'''
__tracebackhide__: bool = hide_tb
# conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False)
):
raise TypeError(
f'{func} must be an async generator function!')
# TODO: i think from here onward should probably
# just be factored into an `@acm` inside a new
# a new `_context.py` mod.
nsf = NamespacePath.from_ref(func)
# XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing
# is not built for it.
if portal.channel.uid == portal.actor.uid:
raise RuntimeError(
'** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n'
f'|_{portal.actor}\n'
)
ctx: Context = await portal.actor.start_remote_task(
portal.channel,
nsf=nsf,
kwargs=kwargs,
# NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does
# not open a stream until after some slow startup/init
# period, in which case when the first msg is read from
# the feeder mem chan, say when first calling
# `Context.open_stream(allow_overruns=True)`, the overrun condition will be
# raised before any ignoring of overflow msgs can take
# place..
allow_overruns=allow_overruns,
)
assert ctx._remote_func_type == 'context'
msg: dict = await ctx._recv_chan.receive()
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first: Any = msg['started']
ctx._started_called: bool = True
except KeyError as src_error:
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
src_err=src_error,
log=log,
expect_key='started',
)
ctx._portal: Portal = portal
uid: tuple = portal.channel.uid
cid: str = ctx.cid
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None
try:
async with trio.open_nursery() as nurse:
# NOTE: used to start overrun queuing tasks
ctx._scope_nursery: trio.Nursery = nurse
ctx._scope: trio.CancelScope = nurse.cancel_scope
# deliver context instance and .started() msg value
# in enter tuple.
yield ctx, first
# ??TODO??: do we still want to consider this or is
# the `else:` block handling via a `.result()`
# call below enough??
# -[ ] pretty sure `.result()` internals do the
# same as our ctxc handler below so it ended up
# being same (repeated?) behaviour, but ideally we
# wouldn't have that duplication either by somehow
# factoring the `.result()` handler impl in a way
# that we can re-use it around the `yield` ^ here
# or vice versa?
#
# NOTE: between the caller exiting and arriving
# here the far end may have sent a ctxc-msg or
# other error, so check for it here immediately
# and maybe raise so as to engage the ctxc
# handling block below!
#
# if re := ctx._remote_error:
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
# re,
# # TODO: do we want this to always raise?
# # - means that on self-ctxc, if/when the
# # block is exited before the msg arrives
# # but then the msg during __exit__
# # calling we may not activate the
# # ctxc-handler block below? should we
# # be?
# # - if there's a remote error that arrives
# # after the child has exited, we won't
# # handle until the `finally:` block
# # where `.result()` is always called,
# # again in which case we handle it
# # differently then in the handler block
# # that would normally engage from THIS
# # block?
# raise_ctxc_from_self_call=True,
# )
# ctxc_from_callee = maybe_ctxc
# when in allow_overruns mode there may be
# lingering overflow sender tasks remaining?
if nurse.child_tasks:
# XXX: ensure we are in overrun state
# with ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery!
if (
not ctx._allow_overruns
or len(nurse.child_tasks) > 1
):
raise InternalError(
'Context has sub-tasks but is '
'not in `allow_overruns=True` mode!?'
)
# ensure we cancel all overflow sender
# tasks started in the nursery when
# `._allow_overruns == True`.
#
# NOTE: this means `._scope.cancelled_caught`
# will prolly be set! not sure if that's
# non-ideal or not ???
ctx._scope.cancel()
# XXX NOTE XXX: maybe shield against
# self-context-cancellation (which raises a local
# `ContextCancelled`) when requested (via
# `Context.cancel()`) by the same task (tree) which entered
# THIS `.open_context()`.
#
# NOTE: There are 2 operating cases for a "graceful cancel"
# of a `Context`. In both cases any `ContextCancelled`
# raised in this scope-block came from a transport msg
# relayed from some remote-actor-task which our runtime set
# as to `Context._remote_error`
#
# the CASES:
#
# - if that context IS THE SAME ONE that called
# `Context.cancel()`, we want to absorb the error
# silently and let this `.open_context()` block to exit
# without raising, ideally eventually receiving the ctxc
# ack msg thus resulting in `ctx.cancel_acked == True`.
#
# - if it is from some OTHER context (we did NOT call
# `.cancel()`), we want to re-RAISE IT whilst also
# setting our own ctx's "reason for cancel" to be that
# other context's cancellation condition; we set our
# `.canceller: tuple[str, str]` to be same value as
# caught here in a `ContextCancelled.canceller`.
#
# AGAIN to restate the above, there are 2 cases:
#
# 1-some other context opened in this `.open_context()`
# block cancelled due to a self or peer cancellation
# request in which case we DO let the error bubble to the
# opener.
#
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from
# bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc:
scope_err = ctxc
ctx._local_error: BaseException = scope_err
ctxc_from_callee = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will
# cause a SIGINT-ignoring HANG!
# -> prolly due to a stale debug lock entry..
# -[ ] USE `.stackscope` to demonstrate that (possibly
# documenting it as a definittive example of
# debugging the tractor-runtime itself using it's
# own `.devx.` tooling!
#
# await _debug.pause()
# CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should
# exit silently.
if (
ctx._cancel_called
and
ctxc is ctx._remote_error
and
ctxc.canceller == portal.actor.uid
):
log.cancel(
f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n'
f'{ctxc}'
)
# CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else
# -> probably a remote peer!
else:
raise
# the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`,
# 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer,
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except (
# CASE 3: standard local error in this caller/yieldee
Exception,
# CASES 1 & 2: can manifest as a `ctx._scope_nursery`
# exception-group of,
#
# 1.-`trio.Cancelled`s, since
# `._scope.cancel()` will have been called
# (transitively by the runtime calling
# `._deliver_msg()`) and any `ContextCancelled`
# eventually absorbed and thus absorbed/supressed in
# any `Context._maybe_raise_remote_err()` call.
#
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error delivered from the "callee" side
# AND a group-exc is only raised if there was > 1
# tasks started *here* in the "caller" / opener
# block. If any one of those tasks calls
# `.result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively
# called and the remote error raised causing all
# tasks to be cancelled.
# NOTE: ^ this case always can happen if any
# overrun handler tasks were spawned!
BaseExceptionGroup,
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt,
) as caller_err:
scope_err = caller_err
ctx._local_error: BaseException = scope_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
# the `ContextCancelled` "self cancellation absorbed" case
# handled in the block above ^^^ !!
# await _debug.pause()
log.cancel(
'Context terminated due to\n\n'
f'.outcome => {ctx.repr_outcome()}\n'
)
if debug_mode():
# async with _debug.acquire_debug_lock(portal.actor.uid):
# pass
# TODO: factor ^ into below for non-root cases?
#
from .devx import maybe_wait_for_debugger
was_acquired: bool = await maybe_wait_for_debugger(
header_msg=(
'Delaying `ctx.cancel()` until debug lock '
'acquired..\n'
),
)
if was_acquired:
log.pdb(
'Acquired debug lock! '
'Calling `ctx.cancel()`!\n'
)
# we don't need to cancel the callee if it already
# told us it's cancelled ;p
if ctxc_from_callee is None:
try:
await ctx.cancel()
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
):
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
f'actor:{uid}'
)
raise # duh
# no local scope error, the "clean exit with a result" case.
else:
if ctx.chan.connected():
log.runtime(
'Waiting on final context result for\n'
f'peer: {uid}\n'
f'|_{ctx._task}\n'
)
# XXX NOTE XXX: the below call to
# `Context.result()` will ALWAYS raise
# a `ContextCancelled` (via an embedded call to
# `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime
# via a call to
# `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
try:
result_or_err: Exception|Any = await ctx.result()
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
# determine things like `.cancelled_caught` for
# cases where there was remote cancellation but
# this task didn't know until final teardown
# / value collection.
scope_err = berr
ctx._local_error: BaseException = scope_err
raise
# yes! this worx Bp
# from .devx import _debug
# await _debug.pause()
# an exception type boxed in a `RemoteActorError`
# is returned (meaning it was obvi not raised)
# that we want to log-report on.
msgdata: str|None = getattr(
result_or_err,
'msgdata',
None
)
match (msgdata, result_or_err):
case (
{'tb_str': tbstr},
ContextCancelled(),
):
log.cancel(tbstr)
case (
{'tb_str': tbstr},
RemoteActorError(),
):
log.exception(
'Context remotely errored!\n'
f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n'
f'{tbstr}'
)
case (None, _):
log.runtime(
'Context returned final result from callee task:\n'
f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n'
f'`{result_or_err}`\n'
)
finally:
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
if debug_mode():
from .devx import maybe_wait_for_debugger
await maybe_wait_for_debugger()
# though it should be impossible for any tasks
# operating *in* this scope to have survived
# we tear down the runtime feeder chan last
# to avoid premature stream clobbers.
if (
(rxchan := ctx._recv_chan)
# maybe TODO: yes i know the below check is
# touching `trio` memchan internals..BUT, there are
# only a couple ways to avoid a `trio.Cancelled`
# bubbling from the `.aclose()` call below:
#
# - catch and mask it via the cancel-scope-shielded call
# as we are rn (manual and frowned upon) OR,
# - specially handle the case where `scope_err` is
# one of {`BaseExceptionGroup`, `trio.Cancelled`}
# and then presume that the `.aclose()` call will
# raise a `trio.Cancelled` and just don't call it
# in those cases..
#
# that latter approach is more logic, LOC, and more
# convoluted so for now stick with the first
# psuedo-hack-workaround where we just try to avoid
# the shielded call as much as we can detect from
# the memchan's `._closed` state..
#
# XXX MOTIVATION XXX-> we generally want to raise
# any underlying actor-runtime/internals error that
# surfaces from a bug in tractor itself so it can
# be easily detected/fixed AND, we also want to
# minimize noisy runtime tracebacks (normally due
# to the cross-actor linked task scope machinery
# teardown) displayed to user-code and instead only
# displaying `ContextCancelled` traces where the
# cause of crash/exit IS due to something in
# user/app code on either end of the context.
and not rxchan._closed
):
# XXX NOTE XXX: and again as per above, we mask any
# `trio.Cancelled` raised here so as to NOT mask
# out any exception group or legit (remote) ctx
# error that sourced from the remote task or its
# runtime.
#
# NOTE: further, this should be the only place the
# underlying feeder channel is
# once-and-only-CLOSED!
with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose()
# XXX: we always raise remote errors locally and
# generally speaking mask runtime-machinery related
# multi-`trio.Cancelled`s. As such, any `scope_error`
# which was the underlying cause of this context's exit
# should be stored as the `Context._local_error` and
# used in determining `Context.cancelled_caught: bool`.
if scope_err is not None:
# sanity, tho can remove?
assert ctx._local_error is scope_err
# ctx._local_error: BaseException = scope_err
# etype: Type[BaseException] = type(scope_err)
# CASE 2
if (
ctx._cancel_called
and ctx.cancel_acked
):
log.cancel(
'Context cancelled by caller task\n'
f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n'
)
# TODO: should we add a `._cancel_req_received`
# flag to determine if the callee manually called
# `ctx.cancel()`?
# -[ ] going to need a cid check no?
# CASE 1
else:
outcome_str: str = ctx.repr_outcome(
show_error_fields=True,
# type_only=True,
)
log.cancel(
f'Context terminated due to local scope error:\n\n'
f'{ctx.chan.uid} => {outcome_str}\n'
)
# FINALLY, remove the context from runtime tracking and
# exit!
log.runtime(
'Removing IPC ctx opened with peer\n'
f'{uid}\n'
f'|_{ctx}\n'
)
portal.actor._contexts.pop(
(uid, cid),
None,
)
def mk_context( def mk_context(
chan: Channel, chan: Channel,
cid: str, cid: str,

View File

@ -19,13 +19,14 @@ Inter-process comms abstractions
""" """
from __future__ import annotations from __future__ import annotations
import struct
import platform
from pprint import pformat
from collections.abc import ( from collections.abc import (
AsyncGenerator, AsyncGenerator,
AsyncIterator, AsyncIterator,
) )
from contextlib import asynccontextmanager as acm
import platform
from pprint import pformat
import struct
import typing import typing
from typing import ( from typing import (
Any, Any,
@ -35,18 +36,16 @@ from typing import (
TypeVar, TypeVar,
) )
from tricycle import BufferedReceiveStream
import msgspec import msgspec
from tricycle import BufferedReceiveStream
import trio import trio
from async_generator import asynccontextmanager
from .log import get_logger from tractor.log import get_logger
from ._exceptions import TransportClosed from tractor._exceptions import TransportClosed
log = get_logger(__name__) log = get_logger(__name__)
_is_windows = platform.system() == 'Windows' _is_windows = platform.system() == 'Windows'
log = get_logger(__name__)
def get_stream_addrs(stream: trio.SocketStream) -> tuple: def get_stream_addrs(stream: trio.SocketStream) -> tuple:
@ -206,7 +205,17 @@ class MsgpackTCPStream(MsgTransport):
else: else:
raise raise
async def send(self, msg: Any) -> None: async def send(
self,
msg: Any,
# hide_tb: bool = False,
) -> None:
'''
Send a msgpack coded blob-as-msg over TCP.
'''
# __tracebackhide__: bool = hide_tb
async with self._send_lock: async with self._send_lock:
bytes_data: bytes = self.encode(msg) bytes_data: bytes = self.encode(msg)
@ -388,15 +397,28 @@ class Channel:
) )
return transport return transport
async def send(self, item: Any) -> None: async def send(
self,
payload: Any,
# hide_tb: bool = False,
) -> None:
'''
Send a coded msg-blob over the transport.
'''
# __tracebackhide__: bool = hide_tb
log.transport( log.transport(
'=> send IPC msg:\n\n' '=> send IPC msg:\n\n'
f'{pformat(item)}\n' f'{pformat(payload)}\n'
) # type: ignore ) # type: ignore
assert self._transport assert self._transport
await self._transport.send(item) await self._transport.send(
payload,
# hide_tb=hide_tb,
)
async def recv(self) -> Any: async def recv(self) -> Any:
assert self._transport assert self._transport
@ -493,7 +515,7 @@ class Channel:
return self._transport.connected() if self._transport else False return self._transport.connected() if self._transport else False
@asynccontextmanager @acm
async def _connect_chan( async def _connect_chan(
host: str, host: str,
port: int port: int

View File

@ -24,6 +24,7 @@ OS processes, possibly on different (hardware) hosts.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm
import importlib import importlib
import inspect import inspect
from typing import ( from typing import (
@ -37,30 +38,21 @@ from dataclasses import dataclass
import warnings import warnings
import trio import trio
from async_generator import asynccontextmanager
from .trionics import maybe_open_nursery from .trionics import maybe_open_nursery
from .devx import (
# _debug,
maybe_wait_for_debugger,
)
from ._state import ( from ._state import (
current_actor, current_actor,
debug_mode,
) )
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
from .msg import NamespacePath from .msg import NamespacePath
from ._exceptions import ( from ._exceptions import (
InternalError,
_raise_from_no_key_in_msg,
unpack_error, unpack_error,
NoResult, NoResult,
ContextCancelled,
RemoteActorError,
) )
from ._context import ( from ._context import (
Context, Context,
open_context_from_portal,
) )
from ._streaming import ( from ._streaming import (
MsgStream, MsgStream,
@ -392,7 +384,7 @@ class Portal:
self.channel, self.channel,
) )
@asynccontextmanager @acm
async def open_stream_from( async def open_stream_from(
self, self,
async_gen_func: Callable, # typing: ignore async_gen_func: Callable, # typing: ignore
@ -449,541 +441,12 @@ class Portal:
# await recv_chan.aclose() # await recv_chan.aclose()
self._streams.remove(rchan) self._streams.remove(rchan)
# TODO: move this impl to `._context` mod and # NOTE: impl is found in `._context`` mod to make
# instead just bind it here as a method so that the logic # reading/groking the details simpler code-org-wise. This
# for ctx stuff stays all in one place (instead of frickin # method does not have to be used over that `@acm` module func
# having to open this file in tandem every gd time!!! XD) # directly, it is for conventience and from the original API
# # design.
@asynccontextmanager open_context = open_context_from_portal
async def open_context(
self,
func: Callable,
allow_overruns: bool = False,
# TODO: if we set this the wrapping `@acm` body will
# still be shown (awkwardly) on pdb REPL entry. Ideally
# we can similarly annotate that frame to NOT show?
hide_tb: bool = False,
# proxied to RPC
**kwargs,
) -> AsyncGenerator[tuple[Context, Any], None]:
'''
Open an inter-actor "task context"; a remote task is
scheduled and cancel-scope-state-linked to a `trio.run()` across
memory boundaries in another actor's runtime.
This is an `@acm` API which allows for deterministic setup
and teardown of a remotely scheduled task in another remote
actor. Once opened, the 2 now "linked" tasks run completely
in parallel in each actor's runtime with their enclosing
`trio.CancelScope`s kept in a synced state wherein if
either side errors or cancels an equivalent error is
relayed to the other side via an SC-compat IPC protocol.
The yielded `tuple` is a pair delivering a `tractor.Context`
and any first value "sent" by the "callee" task via a call
to `Context.started(<value: Any>)`; this side of the
context does not unblock until the "callee" task calls
`.started()` in similar style to `trio.Nursery.start()`.
When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be
retrieved using the `Contex.result()` api.
The yielded ``Context`` instance further allows for opening
bidirectional streams, explicit cancellation and
structurred-concurrency-synchronized final result-msg
collection. See ``tractor.Context`` for more details.
'''
__tracebackhide__: bool = hide_tb
# conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False)
):
raise TypeError(
f'{func} must be an async generator function!')
# TODO: i think from here onward should probably
# just be factored into an `@acm` inside a new
# a new `_context.py` mod.
nsf = NamespacePath.from_ref(func)
# XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing
# is not built for it.
if self.channel.uid == self.actor.uid:
raise RuntimeError(
'** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n'
f'|_{self.actor}\n'
)
ctx: Context = await self.actor.start_remote_task(
self.channel,
nsf=nsf,
kwargs=kwargs,
# NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does
# not open a stream until after some slow startup/init
# period, in which case when the first msg is read from
# the feeder mem chan, say when first calling
# `Context.open_stream(allow_overruns=True)`, the overrun condition will be
# raised before any ignoring of overflow msgs can take
# place..
allow_overruns=allow_overruns,
)
assert ctx._remote_func_type == 'context'
msg: dict = await ctx._recv_chan.receive()
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first: Any = msg['started']
ctx._started_called: bool = True
except KeyError as src_error:
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
src_err=src_error,
log=log,
expect_key='started',
)
ctx._portal: Portal = self
uid: tuple = self.channel.uid
cid: str = ctx.cid
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None
try:
async with trio.open_nursery() as nurse:
# NOTE: used to start overrun queuing tasks
ctx._scope_nursery: trio.Nursery = nurse
ctx._scope: trio.CancelScope = nurse.cancel_scope
# deliver context instance and .started() msg value
# in enter tuple.
yield ctx, first
# ??TODO??: do we still want to consider this or is
# the `else:` block handling via a `.result()`
# call below enough??
# -[ ] pretty sure `.result()` internals do the
# same as our ctxc handler below so it ended up
# being same (repeated?) behaviour, but ideally we
# wouldn't have that duplication either by somehow
# factoring the `.result()` handler impl in a way
# that we can re-use it around the `yield` ^ here
# or vice versa?
#
# NOTE: between the caller exiting and arriving
# here the far end may have sent a ctxc-msg or
# other error, so check for it here immediately
# and maybe raise so as to engage the ctxc
# handling block below!
#
# if re := ctx._remote_error:
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
# re,
# # TODO: do we want this to always raise?
# # - means that on self-ctxc, if/when the
# # block is exited before the msg arrives
# # but then the msg during __exit__
# # calling we may not activate the
# # ctxc-handler block below? should we
# # be?
# # - if there's a remote error that arrives
# # after the child has exited, we won't
# # handle until the `finally:` block
# # where `.result()` is always called,
# # again in which case we handle it
# # differently then in the handler block
# # that would normally engage from THIS
# # block?
# raise_ctxc_from_self_call=True,
# )
# ctxc_from_callee = maybe_ctxc
# when in allow_overruns mode there may be
# lingering overflow sender tasks remaining?
if nurse.child_tasks:
# XXX: ensure we are in overrun state
# with ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery!
if (
not ctx._allow_overruns
or len(nurse.child_tasks) > 1
):
raise InternalError(
'Context has sub-tasks but is '
'not in `allow_overruns=True` mode!?'
)
# ensure we cancel all overflow sender
# tasks started in the nursery when
# `._allow_overruns == True`.
#
# NOTE: this means `._scope.cancelled_caught`
# will prolly be set! not sure if that's
# non-ideal or not ???
ctx._scope.cancel()
# XXX NOTE XXX: maybe shield against
# self-context-cancellation (which raises a local
# `ContextCancelled`) when requested (via
# `Context.cancel()`) by the same task (tree) which entered
# THIS `.open_context()`.
#
# NOTE: There are 2 operating cases for a "graceful cancel"
# of a `Context`. In both cases any `ContextCancelled`
# raised in this scope-block came from a transport msg
# relayed from some remote-actor-task which our runtime set
# as to `Context._remote_error`
#
# the CASES:
#
# - if that context IS THE SAME ONE that called
# `Context.cancel()`, we want to absorb the error
# silently and let this `.open_context()` block to exit
# without raising, ideally eventually receiving the ctxc
# ack msg thus resulting in `ctx.cancel_acked == True`.
#
# - if it is from some OTHER context (we did NOT call
# `.cancel()`), we want to re-RAISE IT whilst also
# setting our own ctx's "reason for cancel" to be that
# other context's cancellation condition; we set our
# `.canceller: tuple[str, str]` to be same value as
# caught here in a `ContextCancelled.canceller`.
#
# AGAIN to restate the above, there are 2 cases:
#
# 1-some other context opened in this `.open_context()`
# block cancelled due to a self or peer cancellation
# request in which case we DO let the error bubble to the
# opener.
#
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from
# bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc:
scope_err = ctxc
ctx._local_error: BaseException = scope_err
ctxc_from_callee = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will
# cause a SIGINT-ignoring HANG!
# -> prolly due to a stale debug lock entry..
# -[ ] USE `.stackscope` to demonstrate that (possibly
# documenting it as a definittive example of
# debugging the tractor-runtime itself using it's
# own `.devx.` tooling!
#
# await _debug.pause()
# CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should
# exit silently.
if (
ctx._cancel_called
and
ctxc is ctx._remote_error
and
ctxc.canceller == self.actor.uid
):
log.cancel(
f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n'
f'{ctxc}'
)
# CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else
# -> probably a remote peer!
else:
raise
# the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`,
# 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer,
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except (
# CASE 3: standard local error in this caller/yieldee
Exception,
# CASES 1 & 2: can manifest as a `ctx._scope_nursery`
# exception-group of,
#
# 1.-`trio.Cancelled`s, since
# `._scope.cancel()` will have been called
# (transitively by the runtime calling
# `._deliver_msg()`) and any `ContextCancelled`
# eventually absorbed and thus absorbed/supressed in
# any `Context._maybe_raise_remote_err()` call.
#
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error delivered from the "callee" side
# AND a group-exc is only raised if there was > 1
# tasks started *here* in the "caller" / opener
# block. If any one of those tasks calls
# `.result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively
# called and the remote error raised causing all
# tasks to be cancelled.
# NOTE: ^ this case always can happen if any
# overrun handler tasks were spawned!
BaseExceptionGroup,
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt,
) as caller_err:
scope_err = caller_err
ctx._local_error: BaseException = scope_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
# the `ContextCancelled` "self cancellation absorbed" case
# handled in the block above ^^^ !!
# await _debug.pause()
log.cancel(
'Context terminated due to\n\n'
f'.outcome => {ctx.repr_outcome()}\n'
)
if debug_mode():
# async with _debug.acquire_debug_lock(self.actor.uid):
# pass
# TODO: factor ^ into below for non-root cases?
was_acquired: bool = await maybe_wait_for_debugger(
header_msg=(
'Delaying `ctx.cancel()` until debug lock '
'acquired..\n'
),
)
if was_acquired:
log.pdb(
'Acquired debug lock! '
'Calling `ctx.cancel()`!\n'
)
# we don't need to cancel the callee if it already
# told us it's cancelled ;p
if ctxc_from_callee is None:
try:
await ctx.cancel()
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
):
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
f'actor:{uid}'
)
raise # duh
# no local scope error, the "clean exit with a result" case.
else:
if ctx.chan.connected():
log.runtime(
'Waiting on final context result for\n'
f'peer: {uid}\n'
f'|_{ctx._task}\n'
)
# XXX NOTE XXX: the below call to
# `Context.result()` will ALWAYS raise
# a `ContextCancelled` (via an embedded call to
# `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime
# via a call to
# `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
try:
result_or_err: Exception|Any = await ctx.result()
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
# determine things like `.cancelled_caught` for
# cases where there was remote cancellation but
# this task didn't know until final teardown
# / value collection.
scope_err = berr
ctx._local_error: BaseException = scope_err
raise
# yes! this worx Bp
# from .devx import _debug
# await _debug.pause()
# an exception type boxed in a `RemoteActorError`
# is returned (meaning it was obvi not raised)
# that we want to log-report on.
msgdata: str|None = getattr(
result_or_err,
'msgdata',
None
)
match (msgdata, result_or_err):
case (
{'tb_str': tbstr},
ContextCancelled(),
):
log.cancel(tbstr)
case (
{'tb_str': tbstr},
RemoteActorError(),
):
log.exception(
'Context remotely errored!\n'
f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n'
f'{tbstr}'
)
case (None, _):
log.runtime(
'Context returned final result from callee task:\n'
f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n'
f'`{result_or_err}`\n'
)
finally:
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
await maybe_wait_for_debugger()
# though it should be impossible for any tasks
# operating *in* this scope to have survived
# we tear down the runtime feeder chan last
# to avoid premature stream clobbers.
if (
(rxchan := ctx._recv_chan)
# maybe TODO: yes i know the below check is
# touching `trio` memchan internals..BUT, there are
# only a couple ways to avoid a `trio.Cancelled`
# bubbling from the `.aclose()` call below:
#
# - catch and mask it via the cancel-scope-shielded call
# as we are rn (manual and frowned upon) OR,
# - specially handle the case where `scope_err` is
# one of {`BaseExceptionGroup`, `trio.Cancelled`}
# and then presume that the `.aclose()` call will
# raise a `trio.Cancelled` and just don't call it
# in those cases..
#
# that latter approach is more logic, LOC, and more
# convoluted so for now stick with the first
# psuedo-hack-workaround where we just try to avoid
# the shielded call as much as we can detect from
# the memchan's `._closed` state..
#
# XXX MOTIVATION XXX-> we generally want to raise
# any underlying actor-runtime/internals error that
# surfaces from a bug in tractor itself so it can
# be easily detected/fixed AND, we also want to
# minimize noisy runtime tracebacks (normally due
# to the cross-actor linked task scope machinery
# teardown) displayed to user-code and instead only
# displaying `ContextCancelled` traces where the
# cause of crash/exit IS due to something in
# user/app code on either end of the context.
and not rxchan._closed
):
# XXX NOTE XXX: and again as per above, we mask any
# `trio.Cancelled` raised here so as to NOT mask
# out any exception group or legit (remote) ctx
# error that sourced from the remote task or its
# runtime.
#
# NOTE: further, this should be the only place the
# underlying feeder channel is
# once-and-only-CLOSED!
with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose()
# XXX: we always raise remote errors locally and
# generally speaking mask runtime-machinery related
# multi-`trio.Cancelled`s. As such, any `scope_error`
# which was the underlying cause of this context's exit
# should be stored as the `Context._local_error` and
# used in determining `Context.cancelled_caught: bool`.
if scope_err is not None:
# sanity, tho can remove?
assert ctx._local_error is scope_err
# ctx._local_error: BaseException = scope_err
# etype: Type[BaseException] = type(scope_err)
# CASE 2
if (
ctx._cancel_called
and ctx.cancel_acked
):
log.cancel(
'Context cancelled by caller task\n'
f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n'
)
# TODO: should we add a `._cancel_req_received`
# flag to determine if the callee manually called
# `ctx.cancel()`?
# -[ ] going to need a cid check no?
# CASE 1
else:
outcome_str: str = ctx.repr_outcome(
show_error_fields=True,
# type_only=True,
)
log.cancel(
f'Context terminated due to local scope error:\n\n'
f'{ctx.chan.uid} => {outcome_str}\n'
)
# FINALLY, remove the context from runtime tracking and
# exit!
log.runtime(
'Removing IPC ctx opened with peer\n'
f'{uid}\n'
f'|_{ctx}\n'
)
self.actor._contexts.pop(
(uid, cid),
None,
)
@dataclass @dataclass
@ -1014,7 +477,7 @@ class LocalPortal:
return await func(**kwargs) return await func(**kwargs)
@asynccontextmanager @acm
async def open_portal( async def open_portal(
channel: Channel, channel: Channel,

View File

@ -315,37 +315,18 @@ async def _errors_relayed_via_ipc(
if not entered_debug: if not entered_debug:
log.exception('Actor crashed:\n') log.exception('Actor crashed:\n')
# always ship errors back to caller # always (try to) ship RPC errors back to caller
err_msg: dict[str, dict] = pack_error(
err,
# tb=tb, # TODO: special tb fmting?
cid=ctx.cid,
)
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
if is_rpc: if is_rpc:
try: #
await chan.send(err_msg)
# TODO: tests for this scenario: # TODO: tests for this scenario:
# - RPC caller closes connection before getting a response # - RPC caller closes connection before getting a response
# should **not** crash this actor.. # should **not** crash this actor..
except ( await try_ship_error_to_remote(
trio.ClosedResourceError, chan,
trio.BrokenResourceError, err,
BrokenPipeError, cid=ctx.cid,
) as ipc_err: remote_descr='caller',
hide_tb=hide_tb,
# if we can't propagate the error that's a big boo boo
log.exception(
f"Failed to ship error to caller @ {chan.uid} !?\n"
f'{ipc_err}'
) )
# error is probably from above coro running code *not from # error is probably from above coro running code *not from
@ -719,10 +700,14 @@ def _get_mod_abspath(module: ModuleType) -> str:
return os.path.abspath(module.__file__) return os.path.abspath(module.__file__)
async def try_ship_error_to_parent( async def try_ship_error_to_remote(
channel: Channel, channel: Channel,
err: Exception|BaseExceptionGroup, err: Exception|BaseExceptionGroup,
cid: str|None = None,
remote_descr: str = 'parent',
hide_tb: bool = True,
) -> None: ) -> None:
''' '''
Box, pack and encode a local runtime(-internal) exception for Box, pack and encode a local runtime(-internal) exception for
@ -730,22 +715,39 @@ async def try_ship_error_to_parent(
local cancellation ignored but logged as critical(ly bad). local cancellation ignored but logged as critical(ly bad).
''' '''
__tracebackhide__: bool = hide_tb
with CancelScope(shield=True): with CancelScope(shield=True):
try: try:
await channel.send(
# NOTE: normally only used for internal runtime errors # NOTE: normally only used for internal runtime errors
# so ship to peer actor without a cid. # so ship to peer actor without a cid.
pack_error(err) msg: dict = pack_error(
err,
cid=cid,
# TODO: special tb fmting for ctxc cases?
# tb=tb,
) )
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# actor: Actor = _state.current_actor()
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
await channel.send(msg)
# XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma..
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
BrokenPipeError,
): ):
# in SC terms this is one of the worst things that can err_msg: dict = msg['error']['tb_str']
# happen and provides for a 2-general's dilemma..
log.critical( log.critical(
f'Failed to ship error to parent ' 'IPC transport failure -> '
f'{channel.uid}, IPC transport failure!' f'failed to ship error to {remote_descr}!\n\n'
f'X=> {channel.uid}\n\n'
f'{err_msg}\n'
) )
@ -954,7 +956,10 @@ class Actor:
log.runtime(f"{uid} successfully connected back to us") log.runtime(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1] return event, self._peers[uid][-1]
def load_modules(self) -> None: def load_modules(
self,
debug_mode: bool = False,
) -> None:
''' '''
Load allowed RPC modules locally (after fork). Load allowed RPC modules locally (after fork).
@ -986,7 +991,9 @@ class Actor:
except ModuleNotFoundError: except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error # it is expected the corresponding `ModuleNotExposed` error
# will be raised later # will be raised later
log.error(f"Failed to import {modpath} in {self.name}") log.error(
f"Failed to import {modpath} in {self.name}"
)
raise raise
def _get_rpc_func(self, ns, funcname): def _get_rpc_func(self, ns, funcname):
@ -1836,7 +1843,7 @@ class Actor:
log.cancel( log.cancel(
'Cancel request for RPC task\n\n' 'Cancel request for RPC task\n\n'
f'<= Actor.cancel_task(): {requesting_uid}\n\n' f'<= Actor._cancel_task(): {requesting_uid}\n\n'
f'=> {ctx._task}\n' f'=> {ctx._task}\n'
f' |_ >> {ctx.repr_rpc}\n' f' |_ >> {ctx.repr_rpc}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n'
@ -2117,11 +2124,6 @@ async def async_main(
): ):
accept_addrs = set_accept_addr_says_rent accept_addrs = set_accept_addr_says_rent
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
# but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
actor.load_modules()
# The "root" nursery ensures the channel with the immediate # The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
@ -2139,6 +2141,24 @@ async def async_main(
actor._service_n = service_nursery actor._service_n = service_nursery
assert actor._service_n assert actor._service_n
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
# but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
actor.load_modules()
# XXX TODO XXX: figuring out debugging of this
# would somemwhat guarantee "self-hosted" runtime
# debugging (since it hits all the ede cases?)
#
# `tractor.pause()` right?
# try:
# actor.load_modules()
# except ModuleNotFoundError as err:
# _debug.pause_from_sync()
# import pdbp; pdbp.set_trace()
# raise
# Startup up the transport(-channel) server with, # Startup up the transport(-channel) server with,
# - subactor: the bind address is sent by our parent # - subactor: the bind address is sent by our parent
# over our established channel # over our established channel
@ -2258,7 +2278,7 @@ async def async_main(
) )
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_parent( await try_ship_error_to_remote(
actor._parent_chan, actor._parent_chan,
err, err,
) )
@ -2674,7 +2694,7 @@ async def process_messages(
log.exception("Actor errored:") log.exception("Actor errored:")
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_parent( await try_ship_error_to_remote(
actor._parent_chan, actor._parent_chan,
err, err,
) )

View File

@ -215,7 +215,7 @@ async def cancel_on_completion(
async def hard_kill( async def hard_kill(
proc: trio.Process, proc: trio.Process,
terminate_after: int = 3, terminate_after: int = 1.6,
# NOTE: for mucking with `.pause()`-ing inside the runtime # NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD # whilst also hacking on it XD
@ -281,8 +281,11 @@ async def hard_kill(
# zombies (as a feature) we ask the OS to do send in the # zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort. # removal swad as the last resort.
if cs.cancelled_caught: if cs.cancelled_caught:
# TODO: toss in the skynet-logo face as ascii art?
log.critical( log.critical(
'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
'#T-800 deployed to collect zombie B0\n'
f'|\n'
f'|_{proc}\n' f'|_{proc}\n'
) )
proc.kill() proc.kill()

View File

@ -114,13 +114,19 @@ class MsgStream(trio.abc.Channel):
stream=self, stream=self,
) )
async def receive(self): async def receive(
self,
hide_tb: bool = True,
):
''' '''
Receive a single msg from the IPC transport, the next in Receive a single msg from the IPC transport, the next in
sequence sent by the far end task (possibly in order as sequence sent by the far end task (possibly in order as
determined by the underlying protocol). determined by the underlying protocol).
''' '''
__tracebackhide__: bool = hide_tb
# NOTE: `trio.ReceiveChannel` implements # NOTE: `trio.ReceiveChannel` implements
# EOC handling as follows (aka uses it # EOC handling as follows (aka uses it
# to gracefully exit async for loops): # to gracefully exit async for loops):
@ -139,7 +145,7 @@ class MsgStream(trio.abc.Channel):
if self._closed: if self._closed:
raise self._closed raise self._closed
src_err: Exception|None = None src_err: Exception|None = None # orig tb
try: try:
try: try:
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
@ -186,7 +192,7 @@ class MsgStream(trio.abc.Channel):
# TODO: Locally, we want to close this stream gracefully, by # TODO: Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically. # terminating any local consumers tasks deterministically.
# One we have broadcast support, we **don't** want to be # Once we have broadcast support, we **don't** want to be
# closing this stream and not flushing a final value to # closing this stream and not flushing a final value to
# remaining (clone) consumers who may not have been # remaining (clone) consumers who may not have been
# scheduled to receive it yet. # scheduled to receive it yet.
@ -237,7 +243,12 @@ class MsgStream(trio.abc.Channel):
raise_ctxc_from_self_call=True, raise_ctxc_from_self_call=True,
) )
raise src_err # propagate # propagate any error but hide low-level frames from
# caller by default.
if hide_tb:
raise type(src_err)(*src_err.args) from src_err
else:
raise src_err
async def aclose(self) -> list[Exception|dict]: async def aclose(self) -> list[Exception|dict]:
''' '''
@ -475,23 +486,39 @@ class MsgStream(trio.abc.Channel):
async def send( async def send(
self, self,
data: Any data: Any,
hide_tb: bool = True,
) -> None: ) -> None:
''' '''
Send a message over this stream to the far end. Send a message over this stream to the far end.
''' '''
if self._ctx._remote_error: __tracebackhide__: bool = hide_tb
raise self._ctx._remote_error # from None
self._ctx.maybe_raise()
if self._closed: if self._closed:
raise self._closed raise self._closed
# raise trio.ClosedResourceError('This stream was already closed')
await self._ctx.chan.send({ try:
await self._ctx.chan.send(
payload={
'yield': data, 'yield': data,
'cid': self._ctx.cid, 'cid': self._ctx.cid,
}) },
# hide_tb=hide_tb,
)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as trans_err:
if hide_tb:
raise type(trans_err)(
*trans_err.args
) from trans_err
else:
raise
def stream(func: Callable) -> Callable: def stream(func: Callable) -> Callable:

View File

@ -533,10 +533,8 @@ async def open_nursery(
''' '''
implicit_runtime: bool = False implicit_runtime: bool = False
actor: Actor = current_actor( actor: Actor = current_actor(err_on_no_runtime=False)
err_on_no_runtime=False an: ActorNursery|None = None
)
try: try:
if ( if (
actor is None actor is None

View File

@ -0,0 +1,74 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Various helpers/utils for auditing your `tractor` app and/or the
core runtime.
'''
from contextlib import asynccontextmanager as acm
import pathlib
import tractor
from .pytest import (
tractor_test as tractor_test
)
def repodir() -> pathlib.Path:
'''
Return the abspath to the repo directory.
'''
# 2 parents up to step up through tests/<repo_dir>
return pathlib.Path(
__file__
# 3 .parents bc:
# <._testing-pkg>.<tractor-pkg>.<git-repo-dir>
# /$HOME/../<tractor-repo-dir>/tractor/_testing/__init__.py
).parent.parent.parent.absolute()
def examples_dir() -> pathlib.Path:
'''
Return the abspath to the examples directory as `pathlib.Path`.
'''
return repodir() / 'examples'
@acm
async def expect_ctxc(
yay: bool,
reraise: bool = False,
) -> None:
'''
Small acm to catch `ContextCancelled` errors when expected
below it in a `async with ()` block.
'''
if yay:
try:
yield
raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled:
if reraise:
raise
else:
return
else:
yield

View File

@ -0,0 +1,113 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`pytest` utils helpers and plugins for testing `tractor`'s runtime
and applications.
'''
from functools import (
partial,
wraps,
)
import inspect
import platform
import tractor
import trio
def tractor_test(fn):
'''
Decorator for async test funcs to present them as "native"
looking sync funcs runnable by `pytest` using `trio.run()`.
Use:
@tractor_test
async def test_whatever():
await ...
If fixtures:
- ``reg_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)
are defined in the `pytest` fixture space they will be automatically
injected to tests declaring these funcargs.
'''
@wraps(fn)
def wrapper(
*args,
loglevel=None,
reg_addr=None,
start_method: str|None = None,
debug_mode: bool = False,
**kwargs
):
# __tracebackhide__ = True
# NOTE: inject ant test func declared fixture
# names by manually checking!
if 'reg_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
kwargs['reg_addr'] = reg_addr
if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['loglevel'] = loglevel
if start_method is None:
if platform.system() == "Windows":
start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['start_method'] = start_method
if 'debug_mode' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['debug_mode'] = debug_mode
if kwargs:
# use explicit root actor start
async def _main():
async with tractor.open_root_actor(
# **kwargs,
registry_addrs=[reg_addr] if reg_addr else None,
loglevel=loglevel,
start_method=start_method,
# TODO: only enable when pytest is passed --pdb
debug_mode=debug_mode,
):
await fn(*args, **kwargs)
main = _main
else:
# use implicit root actor start
main = partial(fn, *args, **kwargs)
return trio.run(main)
return wrapper