Compare commits
1 Commits
f7469442e3
...
547b957bbf
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 547b957bbf |
|
@ -1,81 +0,0 @@
|
||||||
'''
|
|
||||||
Verify we can dump a `stackscope` tree on a hang.
|
|
||||||
|
|
||||||
'''
|
|
||||||
import os
|
|
||||||
import signal
|
|
||||||
|
|
||||||
import trio
|
|
||||||
import tractor
|
|
||||||
|
|
||||||
@tractor.context
|
|
||||||
async def start_n_shield_hang(
|
|
||||||
ctx: tractor.Context,
|
|
||||||
):
|
|
||||||
# actor: tractor.Actor = tractor.current_actor()
|
|
||||||
|
|
||||||
# sync to parent-side task
|
|
||||||
await ctx.started(os.getpid())
|
|
||||||
|
|
||||||
print('Entering shield sleep..')
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await trio.sleep_forever() # in subactor
|
|
||||||
|
|
||||||
# XXX NOTE ^^^ since this shields, we expect
|
|
||||||
# the zombie reaper (aka T800) to engage on
|
|
||||||
# SIGINT from the user and eventually hard-kill
|
|
||||||
# this subprocess!
|
|
||||||
|
|
||||||
|
|
||||||
async def main(
|
|
||||||
from_test: bool = False,
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
async with (
|
|
||||||
tractor.open_nursery(
|
|
||||||
debug_mode=True,
|
|
||||||
enable_stack_on_sig=True,
|
|
||||||
# maybe_enable_greenback=False,
|
|
||||||
loglevel='devx',
|
|
||||||
) as an,
|
|
||||||
):
|
|
||||||
|
|
||||||
ptl: tractor.Portal = await an.start_actor(
|
|
||||||
'hanger',
|
|
||||||
enable_modules=[__name__],
|
|
||||||
debug_mode=True,
|
|
||||||
)
|
|
||||||
async with ptl.open_context(
|
|
||||||
start_n_shield_hang,
|
|
||||||
) as (ctx, cpid):
|
|
||||||
|
|
||||||
_, proc, _ = an._children[ptl.chan.uid]
|
|
||||||
assert cpid == proc.pid
|
|
||||||
|
|
||||||
print(
|
|
||||||
'Yo my child hanging..?\n'
|
|
||||||
'Sending SIGUSR1 to see a tree-trace!\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX simulate the wrapping test's "user actions"
|
|
||||||
# (i.e. if a human didn't run this manually but wants to
|
|
||||||
# know what they should do to reproduce test behaviour)
|
|
||||||
if from_test:
|
|
||||||
os.kill(
|
|
||||||
cpid,
|
|
||||||
signal.SIGUSR1,
|
|
||||||
)
|
|
||||||
|
|
||||||
# simulate user cancelling program
|
|
||||||
await trio.sleep(0.5)
|
|
||||||
os.kill(
|
|
||||||
os.getpid(),
|
|
||||||
signal.SIGINT,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# actually let user send the ctl-c
|
|
||||||
await trio.sleep_forever() # in root
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
trio.run(main)
|
|
|
@ -21,11 +21,9 @@ import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import (
|
from tractor import (
|
||||||
current_actor,
|
current_actor,
|
||||||
Actor,
|
|
||||||
to_asyncio,
|
to_asyncio,
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
_state,
|
|
||||||
)
|
)
|
||||||
from tractor.trionics import BroadcastReceiver
|
from tractor.trionics import BroadcastReceiver
|
||||||
from tractor._testing import expect_ctxc
|
from tractor._testing import expect_ctxc
|
||||||
|
@ -82,16 +80,7 @@ async def asyncio_actor(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# ensure internal runtime state is consistent
|
assert tractor.current_actor().is_infected_aio()
|
||||||
actor: Actor = tractor.current_actor()
|
|
||||||
assert (
|
|
||||||
actor.is_infected_aio()
|
|
||||||
and
|
|
||||||
actor._infected_aio
|
|
||||||
and
|
|
||||||
_state._runtime_vars['_is_infected_aio']
|
|
||||||
)
|
|
||||||
|
|
||||||
target: Callable = globals()[target]
|
target: Callable = globals()[target]
|
||||||
|
|
||||||
if '.' in expect_err:
|
if '.' in expect_err:
|
||||||
|
@ -147,7 +136,7 @@ def test_aio_simple_error(reg_addr):
|
||||||
assert err
|
assert err
|
||||||
|
|
||||||
assert isinstance(err, RemoteActorError)
|
assert isinstance(err, RemoteActorError)
|
||||||
assert err.boxed_type is AssertionError
|
assert err.boxed_type == AssertionError
|
||||||
|
|
||||||
|
|
||||||
def test_tractor_cancels_aio(reg_addr):
|
def test_tractor_cancels_aio(reg_addr):
|
||||||
|
|
|
@ -20,7 +20,6 @@ Sub-process entry points.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import multiprocessing as mp
|
|
||||||
import os
|
import os
|
||||||
import textwrap
|
import textwrap
|
||||||
from typing import (
|
from typing import (
|
||||||
|
@ -65,22 +64,20 @@ def _mp_main(
|
||||||
'''
|
'''
|
||||||
actor._forkserver_info = forkserver_info
|
actor._forkserver_info = forkserver_info
|
||||||
from ._spawn import try_set_start_method
|
from ._spawn import try_set_start_method
|
||||||
spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method)
|
spawn_ctx = try_set_start_method(start_method)
|
||||||
assert spawn_ctx
|
|
||||||
|
|
||||||
if actor.loglevel is not None:
|
if actor.loglevel is not None:
|
||||||
log.info(
|
log.info(
|
||||||
f'Setting loglevel for {actor.uid} to {actor.loglevel}'
|
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
|
||||||
)
|
|
||||||
get_console_log(actor.loglevel)
|
get_console_log(actor.loglevel)
|
||||||
|
|
||||||
# TODO: use scops headers like for `trio` below!
|
assert spawn_ctx
|
||||||
# (well after we libify it maybe..)
|
|
||||||
log.info(
|
log.info(
|
||||||
f'Started new {spawn_ctx.current_process()} for {actor.uid}'
|
f"Started new {spawn_ctx.current_process()} for {actor.uid}")
|
||||||
# f"parent_addr is {parent_addr}"
|
|
||||||
)
|
_state._current_actor = actor
|
||||||
_state._current_actor: Actor = actor
|
|
||||||
|
log.debug(f"parent_addr is {parent_addr}")
|
||||||
trio_main = partial(
|
trio_main = partial(
|
||||||
async_main,
|
async_main,
|
||||||
actor=actor,
|
actor=actor,
|
||||||
|
@ -97,9 +94,7 @@ def _mp_main(
|
||||||
pass # handle it the same way trio does?
|
pass # handle it the same way trio does?
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.info(
|
log.info(f"Subactor {actor.uid} terminated")
|
||||||
f'`mp`-subactor {actor.uid} exited'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually
|
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually
|
||||||
|
|
|
@ -59,7 +59,6 @@ import os
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from trio._core import _run as trio_runtime
|
|
||||||
from trio import (
|
from trio import (
|
||||||
CancelScope,
|
CancelScope,
|
||||||
Nursery,
|
Nursery,
|
||||||
|
@ -81,7 +80,6 @@ from ._context import (
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
InternalError,
|
|
||||||
ModuleNotExposed,
|
ModuleNotExposed,
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
|
@ -104,7 +102,6 @@ from ._rpc import (
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._supervise import ActorNursery
|
from ._supervise import ActorNursery
|
||||||
from trio._channel import MemoryChannelState
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
@ -900,15 +897,11 @@ class Actor:
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'cid:{cid}\n'
|
f'cid:{cid}\n'
|
||||||
)
|
)
|
||||||
ctx._allow_overruns: bool = allow_overruns
|
ctx._allow_overruns = allow_overruns
|
||||||
|
|
||||||
# adjust buffer size if specified
|
# adjust buffer size if specified
|
||||||
state: MemoryChannelState = ctx._send_chan._state # type: ignore
|
state = ctx._send_chan._state # type: ignore
|
||||||
if (
|
if msg_buffer_size and state.max_buffer_size != msg_buffer_size:
|
||||||
msg_buffer_size
|
|
||||||
and
|
|
||||||
state.max_buffer_size != msg_buffer_size
|
|
||||||
):
|
|
||||||
state.max_buffer_size = msg_buffer_size
|
state.max_buffer_size = msg_buffer_size
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -1102,36 +1095,7 @@ class Actor:
|
||||||
'`tractor.pause_from_sync()` not available!'
|
'`tractor.pause_from_sync()` not available!'
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX ensure the "infected `asyncio` mode" setting
|
rvs['_is_root'] = False
|
||||||
# passed down from our spawning parent is consistent
|
|
||||||
# with `trio`-runtime initialization:
|
|
||||||
# - during sub-proc boot, the entrypoint func
|
|
||||||
# (`._entry.<spawn_backend>_main()`) should set
|
|
||||||
# `._infected_aio = True` before calling
|
|
||||||
# `run_as_asyncio_guest()`,
|
|
||||||
# - the value of `infect_asyncio: bool = True` as
|
|
||||||
# passed to `ActorNursery.start_actor()` must be
|
|
||||||
# the same as `_runtime_vars['_is_infected_aio']`
|
|
||||||
if (
|
|
||||||
(aio_rtv := rvs['_is_infected_aio'])
|
|
||||||
!=
|
|
||||||
(aio_attr := self._infected_aio)
|
|
||||||
):
|
|
||||||
raise InternalError(
|
|
||||||
'Parent sent runtime-vars that mismatch for the '
|
|
||||||
'"infected `asyncio` mode" settings ?!?\n\n'
|
|
||||||
|
|
||||||
f'rvs["_is_infected_aio"] = {aio_rtv}\n'
|
|
||||||
f'self._infected_aio = {aio_attr}\n'
|
|
||||||
)
|
|
||||||
if aio_rtv:
|
|
||||||
assert trio_runtime.GLOBAL_RUN_CONTEXT.runner.is_guest
|
|
||||||
# ^TODO^ possibly add a `sniffio` or
|
|
||||||
# `trio` pub-API for `is_guest_mode()`?
|
|
||||||
|
|
||||||
rvs['_is_root'] = False # obvi XD
|
|
||||||
|
|
||||||
# update process-wide globals
|
|
||||||
_state._runtime_vars.update(rvs)
|
_state._runtime_vars.update(rvs)
|
||||||
|
|
||||||
# XXX: ``msgspec`` doesn't support serializing tuples
|
# XXX: ``msgspec`` doesn't support serializing tuples
|
||||||
|
|
|
@ -44,8 +44,6 @@ _runtime_vars: dict[str, Any] = {
|
||||||
'_root_mailbox': (None, None),
|
'_root_mailbox': (None, None),
|
||||||
'_registry_addrs': [],
|
'_registry_addrs': [],
|
||||||
|
|
||||||
'_is_infected_aio': False,
|
|
||||||
|
|
||||||
# for `tractor.pause_from_sync()` & `breakpoint()` support
|
# for `tractor.pause_from_sync()` & `breakpoint()` support
|
||||||
'use_greenback': False,
|
'use_greenback': False,
|
||||||
}
|
}
|
||||||
|
@ -72,8 +70,7 @@ def current_actor(
|
||||||
'''
|
'''
|
||||||
if (
|
if (
|
||||||
err_on_no_runtime
|
err_on_no_runtime
|
||||||
and
|
and _current_actor is None
|
||||||
_current_actor is None
|
|
||||||
):
|
):
|
||||||
msg: str = 'No local actor has been initialized yet?\n'
|
msg: str = 'No local actor has been initialized yet?\n'
|
||||||
from ._exceptions import NoRuntime
|
from ._exceptions import NoRuntime
|
||||||
|
|
|
@ -158,7 +158,6 @@ class ActorNursery:
|
||||||
# configure and pass runtime state
|
# configure and pass runtime state
|
||||||
_rtv = _state._runtime_vars.copy()
|
_rtv = _state._runtime_vars.copy()
|
||||||
_rtv['_is_root'] = False
|
_rtv['_is_root'] = False
|
||||||
_rtv['_is_infected_aio'] = infect_asyncio
|
|
||||||
|
|
||||||
# allow setting debug policy per actor
|
# allow setting debug policy per actor
|
||||||
if debug_mode is not None:
|
if debug_mode is not None:
|
||||||
|
|
|
@ -276,10 +276,7 @@ def _run_asyncio_task(
|
||||||
chan._aio_task: asyncio.Task = task
|
chan._aio_task: asyncio.Task = task
|
||||||
|
|
||||||
# XXX TODO XXX get this actually workin.. XD
|
# XXX TODO XXX get this actually workin.. XD
|
||||||
# -[ ] we need logic to setup `greenback` for `asyncio`-side task
|
# maybe setup `greenback` for `asyncio`-side task REPLing
|
||||||
# REPLing.. which should normally be nearly the same as for
|
|
||||||
# `trio`?
|
|
||||||
# -[ ] add to a new `.devx._greenback.maybe_init_for_asyncio()`?
|
|
||||||
if (
|
if (
|
||||||
debug_mode()
|
debug_mode()
|
||||||
and
|
and
|
||||||
|
@ -308,22 +305,15 @@ def _run_asyncio_task(
|
||||||
|
|
||||||
msg: str = (
|
msg: str = (
|
||||||
'Infected `asyncio` task {etype_str}\n'
|
'Infected `asyncio` task {etype_str}\n'
|
||||||
|
f'|_{task}\n'
|
||||||
)
|
)
|
||||||
if isinstance(terr, CancelledError):
|
if isinstance(terr, CancelledError):
|
||||||
msg += (
|
|
||||||
f'c)>\n'
|
|
||||||
f' |_{task}\n'
|
|
||||||
)
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
msg.format(etype_str='cancelled')
|
msg.format(etype_str='cancelled')
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
msg += (
|
|
||||||
f'x)>\n'
|
|
||||||
f' |_{task}\n'
|
|
||||||
)
|
|
||||||
log.exception(
|
log.exception(
|
||||||
msg.format(etype_str='errored')
|
msg.format(etype_str='cancelled')
|
||||||
)
|
)
|
||||||
|
|
||||||
assert type(terr) is type(aio_err), (
|
assert type(terr) is type(aio_err), (
|
||||||
|
@ -629,10 +619,9 @@ def run_as_asyncio_guest(
|
||||||
# )
|
# )
|
||||||
|
|
||||||
def trio_done_callback(main_outcome):
|
def trio_done_callback(main_outcome):
|
||||||
log.runtime(
|
log.info(
|
||||||
f'`trio` guest-run finishing with outcome\n'
|
f'trio_main finished with\n'
|
||||||
f'>) {main_outcome}\n'
|
f'|_{main_outcome!r}'
|
||||||
f'|_{trio_done_fute}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if isinstance(main_outcome, Error):
|
if isinstance(main_outcome, Error):
|
||||||
|
@ -654,12 +643,6 @@ def run_as_asyncio_guest(
|
||||||
else:
|
else:
|
||||||
trio_done_fute.set_result(main_outcome)
|
trio_done_fute.set_result(main_outcome)
|
||||||
|
|
||||||
log.info(
|
|
||||||
f'`trio` guest-run finished with outcome\n'
|
|
||||||
f')>\n'
|
|
||||||
f'|_{trio_done_fute}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
startup_msg += (
|
startup_msg += (
|
||||||
f'-> created {trio_done_callback!r}\n'
|
f'-> created {trio_done_callback!r}\n'
|
||||||
f'-> scheduling `trio_main`: {trio_main!r}\n'
|
f'-> scheduling `trio_main`: {trio_main!r}\n'
|
||||||
|
@ -698,8 +681,7 @@ def run_as_asyncio_guest(
|
||||||
# error path in `asyncio`'s runtime..?
|
# error path in `asyncio`'s runtime..?
|
||||||
asyncio.CancelledError,
|
asyncio.CancelledError,
|
||||||
|
|
||||||
) as _fute_err:
|
) as fute_err:
|
||||||
fute_err = _fute_err
|
|
||||||
err_message: str = (
|
err_message: str = (
|
||||||
'main `asyncio` task '
|
'main `asyncio` task '
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue