Compare commits
No commits in common. "master" and "deprecate_arbiter_addr" have entirely different histories.
master
...
deprecate_
|
@ -6,14 +6,8 @@
|
|||
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
|
||||
built on trio_.
|
||||
|
||||
Fundamentally, ``tractor`` gives you parallelism via
|
||||
``trio``-"*actors*": independent Python processes (aka
|
||||
non-shared-memory threads) which maintain structured
|
||||
concurrency (SC) *end-to-end* inside a *supervision tree*.
|
||||
|
||||
Cross-process (and thus cross-host) SC is accomplished through the
|
||||
combined use of our "actor nurseries_" and an "SC-transitive IPC
|
||||
protocol" constructed on top of multiple Pythons each running a ``trio``
|
||||
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
|
||||
our nurseries_ let you spawn new Python processes which each run a ``trio``
|
||||
scheduled runtime - a call to ``trio.run()``.
|
||||
|
||||
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
|
||||
|
@ -29,8 +23,7 @@ Features
|
|||
- **It's just** a ``trio`` API
|
||||
- *Infinitely nesteable* process trees
|
||||
- Builtin IPC streaming APIs with task fan-out broadcasting
|
||||
- A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of
|
||||
`pdb++`_ thanks to @mdmintz!)
|
||||
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
|
||||
- Support for a swappable, OS specific, process spawning layer
|
||||
- A modular transport stack, allowing for custom serialization (eg. with
|
||||
`msgspec`_), communications protocols, and environment specific IPC
|
||||
|
@ -156,7 +149,7 @@ it **is a bug**.
|
|||
|
||||
"Native" multi-process debugging
|
||||
--------------------------------
|
||||
Using the magic of `pdbp`_ and our internal IPC, we've
|
||||
Using the magic of `pdb++`_ and our internal IPC, we've
|
||||
been able to create a native feeling debugging experience for
|
||||
any (sub-)process in your ``tractor`` tree.
|
||||
|
||||
|
@ -604,7 +597,6 @@ channel`_!
|
|||
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
|
||||
.. _trio gitter channel: https://gitter.im/python-trio/general
|
||||
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
|
||||
.. _pdbp: https://github.com/mdmintz/pdbp
|
||||
.. _pdb++: https://github.com/pdbpp/pdbpp
|
||||
.. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
|
||||
.. _messages: https://en.wikipedia.org/wiki/Message_passing
|
||||
|
|
|
@ -1,151 +0,0 @@
|
|||
'''
|
||||
Complex edge case where during real-time streaming the IPC tranport
|
||||
channels are wiped out (purposely in this example though it could have
|
||||
been an outage) and we want to ensure that despite being in debug mode
|
||||
(or not) the user can sent SIGINT once they notice the hang and the
|
||||
actor tree will eventually be cancelled without leaving any zombies.
|
||||
|
||||
'''
|
||||
import trio
|
||||
from tractor import (
|
||||
open_nursery,
|
||||
context,
|
||||
Context,
|
||||
MsgStream,
|
||||
)
|
||||
|
||||
|
||||
async def break_channel_silently_then_error(
|
||||
stream: MsgStream,
|
||||
):
|
||||
async for msg in stream:
|
||||
await stream.send(msg)
|
||||
|
||||
# XXX: close the channel right after an error is raised
|
||||
# purposely breaking the IPC transport to make sure the parent
|
||||
# doesn't get stuck in debug or hang on the connection join.
|
||||
# this more or less simulates an infinite msg-receive hang on
|
||||
# the other end.
|
||||
await stream._ctx.chan.send(None)
|
||||
assert 0
|
||||
|
||||
|
||||
async def close_stream_and_error(
|
||||
stream: MsgStream,
|
||||
):
|
||||
async for msg in stream:
|
||||
await stream.send(msg)
|
||||
|
||||
# wipe out channel right before raising
|
||||
await stream._ctx.chan.send(None)
|
||||
await stream.aclose()
|
||||
assert 0
|
||||
|
||||
|
||||
@context
|
||||
async def recv_and_spawn_net_killers(
|
||||
|
||||
ctx: Context,
|
||||
break_ipc_after: bool | int = False,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Receive stream msgs and spawn some IPC killers mid-stream.
|
||||
|
||||
'''
|
||||
await ctx.started()
|
||||
async with (
|
||||
ctx.open_stream() as stream,
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
async for i in stream:
|
||||
print(f'child echoing {i}')
|
||||
await stream.send(i)
|
||||
if (
|
||||
break_ipc_after
|
||||
and i > break_ipc_after
|
||||
):
|
||||
'#################################\n'
|
||||
'Simulating child-side IPC BREAK!\n'
|
||||
'#################################'
|
||||
n.start_soon(break_channel_silently_then_error, stream)
|
||||
n.start_soon(close_stream_and_error, stream)
|
||||
|
||||
|
||||
async def main(
|
||||
debug_mode: bool = False,
|
||||
start_method: str = 'trio',
|
||||
|
||||
# by default we break the parent IPC first (if configured to break
|
||||
# at all), but this can be changed so the child does first (even if
|
||||
# both are set to break).
|
||||
break_parent_ipc_after: int | bool = False,
|
||||
break_child_ipc_after: int | bool = False,
|
||||
|
||||
) -> None:
|
||||
|
||||
async with (
|
||||
open_nursery(
|
||||
start_method=start_method,
|
||||
|
||||
# NOTE: even debugger is used we shouldn't get
|
||||
# a hang since it never engages due to broken IPC
|
||||
debug_mode=debug_mode,
|
||||
loglevel='warning',
|
||||
|
||||
) as an,
|
||||
):
|
||||
portal = await an.start_actor(
|
||||
'chitty_hijo',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with portal.open_context(
|
||||
recv_and_spawn_net_killers,
|
||||
break_ipc_after=break_child_ipc_after,
|
||||
|
||||
) as (ctx, sent):
|
||||
async with ctx.open_stream() as stream:
|
||||
for i in range(1000):
|
||||
|
||||
if (
|
||||
break_parent_ipc_after
|
||||
and i > break_parent_ipc_after
|
||||
):
|
||||
print(
|
||||
'#################################\n'
|
||||
'Simulating parent-side IPC BREAK!\n'
|
||||
'#################################'
|
||||
)
|
||||
await stream._ctx.chan.send(None)
|
||||
|
||||
# it actually breaks right here in the
|
||||
# mp_spawn/forkserver backends and thus the zombie
|
||||
# reaper never even kicks in?
|
||||
print(f'parent sending {i}')
|
||||
await stream.send(i)
|
||||
|
||||
with trio.move_on_after(2) as cs:
|
||||
|
||||
# NOTE: in the parent side IPC failure case this
|
||||
# will raise an ``EndOfChannel`` after the child
|
||||
# is killed and sends a stop msg back to it's
|
||||
# caller/this-parent.
|
||||
rx = await stream.receive()
|
||||
|
||||
print(f"I'm a happy user and echoed to me is {rx}")
|
||||
|
||||
if cs.cancelled_caught:
|
||||
# pretend to be a user seeing no streaming action
|
||||
# thinking it's a hang, and then hitting ctl-c..
|
||||
print("YOO i'm a user anddd thingz hangin..")
|
||||
|
||||
print(
|
||||
"YOO i'm mad send side dun but thingz hangin..\n"
|
||||
'MASHING CTlR-C Ctl-c..'
|
||||
)
|
||||
raise KeyboardInterrupt
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trio.run(main)
|
|
@ -1,24 +0,0 @@
|
|||
import os
|
||||
import sys
|
||||
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
async with tractor.open_nursery(debug_mode=True) as an:
|
||||
|
||||
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
|
||||
|
||||
# TODO: an assert that verifies the hook has indeed been, hooked
|
||||
# XD
|
||||
assert sys.breakpointhook is not tractor._debug._set_trace
|
||||
|
||||
breakpoint()
|
||||
|
||||
# TODO: an assert that verifies the hook is unhooked..
|
||||
assert sys.breakpointhook
|
||||
breakpoint()
|
||||
|
||||
if __name__ == '__main__':
|
||||
trio.run(main)
|
|
@ -1,19 +0,0 @@
|
|||
Rework our ``.trionics.BroadcastReceiver`` internals to avoid method
|
||||
recursion and approach a design and interface closer to ``trio``'s
|
||||
``MemoryReceiveChannel``.
|
||||
|
||||
The details of the internal changes include:
|
||||
|
||||
- implementing a ``BroadcastReceiver.receive_nowait()`` and using it
|
||||
within the async ``.receive()`` thus avoiding recursion from
|
||||
``.receive()``.
|
||||
- failing over to an internal ``._receive_from_underlying()`` when the
|
||||
``_nowait()`` call raises ``trio.WouldBlock``
|
||||
- adding ``BroadcastState.statistics()`` for debugging and testing both
|
||||
internals and by users.
|
||||
- add an internal ``BroadcastReceiver._raise_on_lag: bool`` which can be
|
||||
set to avoid ``Lagged`` raising for possible use cases where a user
|
||||
wants to choose between a [cheap or nasty
|
||||
pattern](https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern)
|
||||
the the particular stream (we use this in ``piker``'s dark clearing
|
||||
engine to avoid fast feeds breaking during HFT periods).
|
|
@ -1,15 +0,0 @@
|
|||
Fixes to ensure IPC (channel) breakage doesn't result in hung actor
|
||||
trees; the zombie reaping and general supervision machinery will always
|
||||
clean up and terminate.
|
||||
|
||||
This includes not only the (mostly minor) fixes to solve these cases but
|
||||
also a new extensive test suite in `test_advanced_faults.py` with an
|
||||
accompanying highly configurable example module-script in
|
||||
`examples/advanced_faults/ipc_failure_during_stream.py`. Tests ensure we
|
||||
never get hang or zombies despite operating in debug mode and attempt to
|
||||
simulate all possible IPC transport failure cases for a local-host actor
|
||||
tree.
|
||||
|
||||
Further we simplify `Context.open_stream.__aexit__()` to just call
|
||||
`MsgStream.aclose()` directly more or less avoiding a pure duplicate
|
||||
code path.
|
|
@ -1,7 +0,0 @@
|
|||
Drop `trio.Process.aclose()` usage, copy into our spawning code.
|
||||
|
||||
The details are laid out in https://github.com/goodboy/tractor/issues/330.
|
||||
`trio` changed is process running quite some time ago, this just copies
|
||||
out the small bit we needed (from the old `.aclose()`) for hard kills
|
||||
where a soft runtime cancel request fails and our "zombie killer"
|
||||
implementation kicks in.
|
|
@ -1,15 +0,0 @@
|
|||
Switch to using the fork & fix of `pdb++`, `pdbp`:
|
||||
https://github.com/mdmintz/pdbp
|
||||
|
||||
Allows us to sidestep a variety of issues that aren't being maintained
|
||||
in the upstream project thanks to the hard work of @mdmintz!
|
||||
|
||||
We also include some default settings adjustments as per recent
|
||||
development on the fork:
|
||||
|
||||
- sticky mode is still turned on by default but now activates when
|
||||
a using the `ll` repl command.
|
||||
- turn off line truncation by default to avoid inter-line gaps when
|
||||
resizing the terimnal during use.
|
||||
- when using the backtrace cmd either by `w` or `bt`, the config
|
||||
automatically switches to non-sticky mode.
|
|
@ -1,7 +1,7 @@
|
|||
pytest
|
||||
pytest-trio
|
||||
pytest-timeout
|
||||
pdbp
|
||||
pdbpp
|
||||
mypy
|
||||
trio_typing
|
||||
pexpect
|
||||
|
|
13
setup.py
13
setup.py
|
@ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f:
|
|||
setup(
|
||||
name="tractor",
|
||||
version='0.1.0a6dev0', # alpha zone
|
||||
description='structured concurrrent `trio`-"actors"',
|
||||
description='structured concurrrent "actors"',
|
||||
long_description=readme,
|
||||
license='AGPLv3',
|
||||
author='Tyler Goodlet',
|
||||
maintainer='Tyler Goodlet',
|
||||
maintainer_email='goodboy_foss@protonmail.com',
|
||||
maintainer_email='jgbt@protonmail.com',
|
||||
url='https://github.com/goodboy/tractor',
|
||||
platforms=['linux', 'windows'],
|
||||
packages=[
|
||||
|
@ -52,14 +52,16 @@ setup(
|
|||
# tooling
|
||||
'tricycle',
|
||||
'trio_typing',
|
||||
|
||||
# tooling
|
||||
'colorlog',
|
||||
'wrapt',
|
||||
|
||||
# IPC serialization
|
||||
# serialization
|
||||
'msgspec',
|
||||
|
||||
# debug mode REPL
|
||||
'pdbp',
|
||||
'pdbpp',
|
||||
|
||||
# pip ref docs on these specs:
|
||||
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
|
||||
|
@ -71,9 +73,10 @@ setup(
|
|||
# https://github.com/pdbpp/fancycompleter/issues/37
|
||||
'pyreadline3 ; platform_system == "Windows"',
|
||||
|
||||
|
||||
],
|
||||
tests_require=['pytest'],
|
||||
python_requires=">=3.10",
|
||||
python_requires=">=3.9",
|
||||
keywords=[
|
||||
'trio',
|
||||
'async',
|
||||
|
|
|
@ -7,7 +7,6 @@ import os
|
|||
import random
|
||||
import signal
|
||||
import platform
|
||||
import pathlib
|
||||
import time
|
||||
import inspect
|
||||
from functools import partial, wraps
|
||||
|
@ -114,21 +113,14 @@ no_windows = pytest.mark.skipif(
|
|||
)
|
||||
|
||||
|
||||
def repodir() -> pathlib.Path:
|
||||
'''
|
||||
Return the abspath to the repo directory.
|
||||
|
||||
'''
|
||||
# 2 parents up to step up through tests/<repo_dir>
|
||||
return pathlib.Path(__file__).parent.parent.absolute()
|
||||
|
||||
|
||||
def examples_dir() -> pathlib.Path:
|
||||
'''
|
||||
Return the abspath to the examples directory as `pathlib.Path`.
|
||||
|
||||
'''
|
||||
return repodir() / 'examples'
|
||||
def repodir():
|
||||
"""Return the abspath to the repo directory.
|
||||
"""
|
||||
dirname = os.path.dirname
|
||||
dirpath = os.path.abspath(
|
||||
dirname(dirname(os.path.realpath(__file__)))
|
||||
)
|
||||
return dirpath
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
|
@ -159,7 +151,7 @@ def loglevel(request):
|
|||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def spawn_backend(request) -> str:
|
||||
def spawn_backend(request):
|
||||
return request.config.option.spawn_backend
|
||||
|
||||
|
||||
|
|
|
@ -1,193 +0,0 @@
|
|||
'''
|
||||
Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la
|
||||
cancelacion?..
|
||||
|
||||
'''
|
||||
from functools import partial
|
||||
|
||||
import pytest
|
||||
from _pytest.pathlib import import_path
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
from conftest import (
|
||||
examples_dir,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'debug_mode',
|
||||
[False, True],
|
||||
ids=['no_debug_mode', 'debug_mode'],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'ipc_break',
|
||||
[
|
||||
# no breaks
|
||||
{
|
||||
'break_parent_ipc_after': False,
|
||||
'break_child_ipc_after': False,
|
||||
},
|
||||
|
||||
# only parent breaks
|
||||
{
|
||||
'break_parent_ipc_after': 500,
|
||||
'break_child_ipc_after': False,
|
||||
},
|
||||
|
||||
# only child breaks
|
||||
{
|
||||
'break_parent_ipc_after': False,
|
||||
'break_child_ipc_after': 500,
|
||||
},
|
||||
|
||||
# both: break parent first
|
||||
{
|
||||
'break_parent_ipc_after': 500,
|
||||
'break_child_ipc_after': 800,
|
||||
},
|
||||
# both: break child first
|
||||
{
|
||||
'break_parent_ipc_after': 800,
|
||||
'break_child_ipc_after': 500,
|
||||
},
|
||||
|
||||
],
|
||||
ids=[
|
||||
'no_break',
|
||||
'break_parent',
|
||||
'break_child',
|
||||
'break_both_parent_first',
|
||||
'break_both_child_first',
|
||||
],
|
||||
)
|
||||
def test_ipc_channel_break_during_stream(
|
||||
debug_mode: bool,
|
||||
spawn_backend: str,
|
||||
ipc_break: dict | None,
|
||||
):
|
||||
'''
|
||||
Ensure we can have an IPC channel break its connection during
|
||||
streaming and it's still possible for the (simulated) user to kill
|
||||
the actor tree using SIGINT.
|
||||
|
||||
We also verify the type of connection error expected in the parent
|
||||
depending on which side if the IPC breaks first.
|
||||
|
||||
'''
|
||||
if spawn_backend != 'trio':
|
||||
if debug_mode:
|
||||
pytest.skip('`debug_mode` only supported on `trio` spawner')
|
||||
|
||||
# non-`trio` spawners should never hit the hang condition that
|
||||
# requires the user to do ctl-c to cancel the actor tree.
|
||||
expect_final_exc = trio.ClosedResourceError
|
||||
|
||||
mod = import_path(
|
||||
examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py',
|
||||
root=examples_dir(),
|
||||
)
|
||||
|
||||
expect_final_exc = KeyboardInterrupt
|
||||
|
||||
# when ONLY the child breaks we expect the parent to get a closed
|
||||
# resource error on the next `MsgStream.receive()` and then fail out
|
||||
# and cancel the child from there.
|
||||
if (
|
||||
|
||||
# only child breaks
|
||||
(
|
||||
ipc_break['break_child_ipc_after']
|
||||
and ipc_break['break_parent_ipc_after'] is False
|
||||
)
|
||||
|
||||
# both break but, parent breaks first
|
||||
or (
|
||||
ipc_break['break_child_ipc_after'] is not False
|
||||
and (
|
||||
ipc_break['break_parent_ipc_after']
|
||||
> ipc_break['break_child_ipc_after']
|
||||
)
|
||||
)
|
||||
|
||||
):
|
||||
expect_final_exc = trio.ClosedResourceError
|
||||
|
||||
# when the parent IPC side dies (even if the child's does as well
|
||||
# but the child fails BEFORE the parent) we expect the channel to be
|
||||
# sent a stop msg from the child at some point which will signal the
|
||||
# parent that the stream has been terminated.
|
||||
# NOTE: when the parent breaks "after" the child you get this same
|
||||
# case as well, the child breaks the IPC channel with a stop msg
|
||||
# before any closure takes place.
|
||||
elif (
|
||||
# only parent breaks
|
||||
(
|
||||
ipc_break['break_parent_ipc_after']
|
||||
and ipc_break['break_child_ipc_after'] is False
|
||||
)
|
||||
|
||||
# both break but, child breaks first
|
||||
or (
|
||||
ipc_break['break_parent_ipc_after'] is not False
|
||||
and (
|
||||
ipc_break['break_child_ipc_after']
|
||||
> ipc_break['break_parent_ipc_after']
|
||||
)
|
||||
)
|
||||
):
|
||||
expect_final_exc = trio.EndOfChannel
|
||||
|
||||
with pytest.raises(expect_final_exc):
|
||||
trio.run(
|
||||
partial(
|
||||
mod.main,
|
||||
debug_mode=debug_mode,
|
||||
start_method=spawn_backend,
|
||||
**ipc_break,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def break_ipc_after_started(
|
||||
ctx: tractor.Context,
|
||||
) -> None:
|
||||
await ctx.started()
|
||||
async with ctx.open_stream() as stream:
|
||||
await stream.aclose()
|
||||
await trio.sleep(0.2)
|
||||
await ctx.chan.send(None)
|
||||
print('child broke IPC and terminating')
|
||||
|
||||
|
||||
def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
|
||||
'''
|
||||
Verify that is a subactor's IPC goes down just after bringing up a stream
|
||||
the parent can trigger a SIGINT and the child will be reaped out-of-IPC by
|
||||
the localhost process supervision machinery: aka "zombie lord".
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.start_actor(
|
||||
'ipc_breaker',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
with trio.move_on_after(1):
|
||||
async with (
|
||||
portal.open_context(
|
||||
break_ipc_after_started
|
||||
) as (ctx, sent),
|
||||
):
|
||||
async with ctx.open_stream():
|
||||
await trio.sleep(0.5)
|
||||
|
||||
print('parent waiting on context')
|
||||
|
||||
print('parent exited context')
|
||||
raise KeyboardInterrupt
|
||||
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
trio.run(main)
|
|
@ -14,7 +14,7 @@ def is_win():
|
|||
return platform.system() == 'Windows'
|
||||
|
||||
|
||||
_registry: dict[str, set[tractor.MsgStream]] = {
|
||||
_registry: dict[str, set[tractor.ReceiveMsgStream]] = {
|
||||
'even': set(),
|
||||
'odd': set(),
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ import itertools
|
|||
from os import path
|
||||
from typing import Optional
|
||||
import platform
|
||||
import pathlib
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
@ -25,10 +24,7 @@ from pexpect.exceptions import (
|
|||
EOF,
|
||||
)
|
||||
|
||||
from conftest import (
|
||||
examples_dir,
|
||||
_ci_env,
|
||||
)
|
||||
from conftest import repodir, _ci_env
|
||||
|
||||
# TODO: The next great debugger audit could be done by you!
|
||||
# - recurrent entry to breakpoint() from single actor *after* and an
|
||||
|
@ -47,13 +43,19 @@ if platform.system() == 'Windows':
|
|||
)
|
||||
|
||||
|
||||
def mk_cmd(ex_name: str) -> str:
|
||||
'''
|
||||
Generate a command suitable to pass to ``pexpect.spawn()``.
|
||||
def examples_dir():
|
||||
"""Return the abspath to the examples directory.
|
||||
"""
|
||||
return path.join(repodir(), 'examples', 'debugging/')
|
||||
|
||||
'''
|
||||
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
|
||||
return ' '.join(['python', str(script_path)])
|
||||
|
||||
def mk_cmd(ex_name: str) -> str:
|
||||
"""Generate a command suitable to pass to ``pexpect.spawn()``.
|
||||
"""
|
||||
return ' '.join(
|
||||
['python',
|
||||
path.join(examples_dir(), f'{ex_name}.py')]
|
||||
)
|
||||
|
||||
|
||||
# TODO: was trying to this xfail style but some weird bug i see in CI
|
||||
|
@ -95,7 +97,7 @@ def spawn(
|
|||
return _spawn
|
||||
|
||||
|
||||
PROMPT = r"\(Pdb\+\)"
|
||||
PROMPT = r"\(Pdb\+\+\)"
|
||||
|
||||
|
||||
def expect(
|
||||
|
@ -151,6 +153,18 @@ def ctlc(
|
|||
|
||||
use_ctlc = request.param
|
||||
|
||||
if (
|
||||
sys.version_info <= (3, 10)
|
||||
and use_ctlc
|
||||
):
|
||||
# on 3.9 it seems the REPL UX
|
||||
# is highly unreliable and frankly annoying
|
||||
# to test for. It does work from manual testing
|
||||
# but i just don't think it's wroth it to try
|
||||
# and get this working especially since we want to
|
||||
# be 3.10+ mega-asap.
|
||||
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
|
||||
|
||||
node = request.node
|
||||
markers = node.own_markers
|
||||
for mark in markers:
|
||||
|
@ -181,15 +195,13 @@ def ctlc(
|
|||
ids=lambda item: f'{item[0]} -> {item[1]}',
|
||||
)
|
||||
def test_root_actor_error(spawn, user_in_out):
|
||||
'''
|
||||
Demonstrate crash handler entering pdb from basic error in root actor.
|
||||
|
||||
'''
|
||||
"""Demonstrate crash handler entering pdbpp from basic error in root actor.
|
||||
"""
|
||||
user_input, expect_err_str = user_in_out
|
||||
|
||||
child = spawn('root_actor_error')
|
||||
|
||||
# scan for the prompt
|
||||
# scan for the pdbpp prompt
|
||||
expect(child, PROMPT)
|
||||
|
||||
before = str(child.before.decode())
|
||||
|
@ -220,8 +232,8 @@ def test_root_actor_bp(spawn, user_in_out):
|
|||
user_input, expect_err_str = user_in_out
|
||||
child = spawn('root_actor_breakpoint')
|
||||
|
||||
# scan for the prompt
|
||||
child.expect(PROMPT)
|
||||
# scan for the pdbpp prompt
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
assert 'Error' not in str(child.before)
|
||||
|
||||
|
@ -262,7 +274,7 @@ def do_ctlc(
|
|||
if expect_prompt:
|
||||
before = str(child.before.decode())
|
||||
time.sleep(delay)
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
time.sleep(delay)
|
||||
|
||||
if patt:
|
||||
|
@ -281,7 +293,7 @@ def test_root_actor_bp_forever(
|
|||
# entries
|
||||
for _ in range(10):
|
||||
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
|
@ -291,7 +303,7 @@ def test_root_actor_bp_forever(
|
|||
# do one continue which should trigger a
|
||||
# new task to lock the tty
|
||||
child.sendline('continue')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# seems that if we hit ctrl-c too fast the
|
||||
# sigint guard machinery might not kick in..
|
||||
|
@ -302,10 +314,10 @@ def test_root_actor_bp_forever(
|
|||
|
||||
# XXX: this previously caused a bug!
|
||||
child.sendline('n')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
child.sendline('n')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# quit out of the loop
|
||||
child.sendline('q')
|
||||
|
@ -328,8 +340,8 @@ def test_subactor_error(
|
|||
'''
|
||||
child = spawn('subactor_error')
|
||||
|
||||
# scan for the prompt
|
||||
child.expect(PROMPT)
|
||||
# scan for the pdbpp prompt
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
before = str(child.before.decode())
|
||||
assert "Attaching to pdb in crashed actor: ('name_error'" in before
|
||||
|
@ -349,7 +361,7 @@ def test_subactor_error(
|
|||
# creating actor
|
||||
child.sendline('continue')
|
||||
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
before = str(child.before.decode())
|
||||
|
||||
# root actor gets debugger engaged
|
||||
|
@ -376,8 +388,8 @@ def test_subactor_breakpoint(
|
|||
|
||||
child = spawn('subactor_breakpoint')
|
||||
|
||||
# scan for the prompt
|
||||
child.expect(PROMPT)
|
||||
# scan for the pdbpp prompt
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
before = str(child.before.decode())
|
||||
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
||||
|
@ -386,7 +398,7 @@ def test_subactor_breakpoint(
|
|||
# entries
|
||||
for _ in range(10):
|
||||
child.sendline('next')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
|
@ -394,7 +406,7 @@ def test_subactor_breakpoint(
|
|||
# now run some "continues" to show re-entries
|
||||
for _ in range(5):
|
||||
child.sendline('continue')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
before = str(child.before.decode())
|
||||
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
||||
|
||||
|
@ -405,7 +417,7 @@ def test_subactor_breakpoint(
|
|||
child.sendline('q')
|
||||
|
||||
# child process should exit but parent will capture pdb.BdbQuit
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
before = str(child.before.decode())
|
||||
assert "RemoteActorError: ('breakpoint_forever'" in before
|
||||
|
@ -437,8 +449,8 @@ def test_multi_subactors(
|
|||
'''
|
||||
child = spawn(r'multi_subactors')
|
||||
|
||||
# scan for the prompt
|
||||
child.expect(PROMPT)
|
||||
# scan for the pdbpp prompt
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
before = str(child.before.decode())
|
||||
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
||||
|
@ -450,7 +462,7 @@ def test_multi_subactors(
|
|||
# entries
|
||||
for _ in range(10):
|
||||
child.sendline('next')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
|
@ -459,7 +471,7 @@ def test_multi_subactors(
|
|||
child.sendline('c')
|
||||
|
||||
# first name_error failure
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
before = str(child.before.decode())
|
||||
assert "Attaching to pdb in crashed actor: ('name_error'" in before
|
||||
assert "NameError" in before
|
||||
|
@ -471,7 +483,7 @@ def test_multi_subactors(
|
|||
child.sendline('c')
|
||||
|
||||
# 2nd name_error failure
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# TODO: will we ever get the race where this crash will show up?
|
||||
# blocklist strat now prevents this crash
|
||||
|
@ -485,7 +497,7 @@ def test_multi_subactors(
|
|||
|
||||
# breakpoint loop should re-engage
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
before = str(child.before.decode())
|
||||
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
||||
|
||||
|
@ -501,7 +513,7 @@ def test_multi_subactors(
|
|||
):
|
||||
child.sendline('c')
|
||||
time.sleep(0.1)
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
before = str(child.before.decode())
|
||||
|
||||
if ctlc:
|
||||
|
@ -520,11 +532,11 @@ def test_multi_subactors(
|
|||
# now run some "continues" to show re-entries
|
||||
for _ in range(5):
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# quit the loop and expect parent to attach
|
||||
child.sendline('q')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
before = str(child.before.decode())
|
||||
|
||||
assert_before(child, [
|
||||
|
@ -568,7 +580,7 @@ def test_multi_daemon_subactors(
|
|||
'''
|
||||
child = spawn('multi_daemon_subactors')
|
||||
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# there can be a race for which subactor will acquire
|
||||
# the root's tty lock first so anticipate either crash
|
||||
|
@ -598,7 +610,7 @@ def test_multi_daemon_subactors(
|
|||
# second entry by `bp_forever`.
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
assert_before(child, [next_msg])
|
||||
|
||||
# XXX: hooray the root clobbering the child here was fixed!
|
||||
|
@ -620,7 +632,7 @@ def test_multi_daemon_subactors(
|
|||
|
||||
# expect another breakpoint actor entry
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
try:
|
||||
assert_before(child, [bp_forever_msg])
|
||||
|
@ -636,7 +648,7 @@ def test_multi_daemon_subactors(
|
|||
# after 1 or more further bp actor entries.
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
assert_before(child, [name_error_msg])
|
||||
|
||||
# wait for final error in root
|
||||
|
@ -644,7 +656,7 @@ def test_multi_daemon_subactors(
|
|||
while True:
|
||||
try:
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
assert_before(
|
||||
child,
|
||||
[bp_forever_msg]
|
||||
|
@ -677,8 +689,8 @@ def test_multi_subactors_root_errors(
|
|||
'''
|
||||
child = spawn('multi_subactor_root_errors')
|
||||
|
||||
# scan for the prompt
|
||||
child.expect(PROMPT)
|
||||
# scan for the pdbpp prompt
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# at most one subactor should attach before the root is cancelled
|
||||
before = str(child.before.decode())
|
||||
|
@ -693,7 +705,7 @@ def test_multi_subactors_root_errors(
|
|||
|
||||
# due to block list strat from #337, this will no longer
|
||||
# propagate before the root errors and cancels the spawner sub-tree.
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# only if the blocking condition doesn't kick in fast enough
|
||||
before = str(child.before.decode())
|
||||
|
@ -708,7 +720,7 @@ def test_multi_subactors_root_errors(
|
|||
do_ctlc(child)
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# check if the spawner crashed or was blocked from debug
|
||||
# and if this intermediary attached check the boxed error
|
||||
|
@ -725,7 +737,7 @@ def test_multi_subactors_root_errors(
|
|||
do_ctlc(child)
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# expect a root actor crash
|
||||
assert_before(child, [
|
||||
|
@ -774,7 +786,7 @@ def test_multi_nested_subactors_error_through_nurseries(
|
|||
|
||||
for send_char in itertools.cycle(['c', 'q']):
|
||||
try:
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
child.sendline(send_char)
|
||||
time.sleep(0.01)
|
||||
|
||||
|
@ -816,7 +828,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
|||
|
||||
child = spawn('root_cancelled_but_child_is_in_tty_lock')
|
||||
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
before = str(child.before.decode())
|
||||
assert "NameError: name 'doggypants' is not defined" in before
|
||||
|
@ -831,7 +843,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
|||
for i in range(4):
|
||||
time.sleep(0.5)
|
||||
try:
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
except (
|
||||
EOF,
|
||||
|
@ -888,7 +900,7 @@ def test_root_cancels_child_context_during_startup(
|
|||
'''
|
||||
child = spawn('fast_error_in_root_after_spawn')
|
||||
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
before = str(child.before.decode())
|
||||
assert "AssertionError" in before
|
||||
|
@ -905,7 +917,7 @@ def test_different_debug_mode_per_actor(
|
|||
ctlc: bool,
|
||||
):
|
||||
child = spawn('per_actor_debug')
|
||||
child.expect(PROMPT)
|
||||
child.expect(r"\(Pdb\+\+\)")
|
||||
|
||||
# only one actor should enter the debugger
|
||||
before = str(child.before.decode())
|
||||
|
|
|
@ -12,17 +12,17 @@ import shutil
|
|||
|
||||
import pytest
|
||||
|
||||
from conftest import (
|
||||
examples_dir,
|
||||
)
|
||||
from conftest import repodir
|
||||
|
||||
|
||||
def examples_dir():
|
||||
"""Return the abspath to the examples directory.
|
||||
"""
|
||||
return os.path.join(repodir(), 'examples')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def run_example_in_subproc(
|
||||
loglevel: str,
|
||||
testdir,
|
||||
arb_addr: tuple[str, int],
|
||||
):
|
||||
def run_example_in_subproc(loglevel, testdir, arb_addr):
|
||||
|
||||
@contextmanager
|
||||
def run(script_code):
|
||||
|
@ -32,8 +32,8 @@ def run_example_in_subproc(
|
|||
# on windows we need to create a special __main__.py which will
|
||||
# be executed with ``python -m <modulename>`` on windows..
|
||||
shutil.copyfile(
|
||||
examples_dir() / '__main__.py',
|
||||
str(testdir / '__main__.py'),
|
||||
os.path.join(examples_dir(), '__main__.py'),
|
||||
os.path.join(str(testdir), '__main__.py')
|
||||
)
|
||||
|
||||
# drop the ``if __name__ == '__main__'`` guard onwards from
|
||||
|
@ -88,7 +88,6 @@ def run_example_in_subproc(
|
|||
and f[0] != '_'
|
||||
and 'debugging' not in p[0]
|
||||
and 'integration' not in p[0]
|
||||
and 'advanced_faults' not in p[0]
|
||||
],
|
||||
|
||||
ids=lambda t: t[1],
|
||||
|
|
|
@ -251,7 +251,7 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
|
|||
|
||||
results, diff = time_quad_ex
|
||||
assert results
|
||||
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
|
||||
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.666
|
||||
assert diff < this_fast
|
||||
|
||||
|
|
@ -12,10 +12,7 @@ import pytest
|
|||
import trio
|
||||
from trio.lowlevel import current_task
|
||||
import tractor
|
||||
from tractor.trionics import (
|
||||
broadcast_receiver,
|
||||
Lagged,
|
||||
)
|
||||
from tractor.trionics import broadcast_receiver, Lagged
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -40,7 +37,7 @@ async def echo_sequences(
|
|||
|
||||
async def ensure_sequence(
|
||||
|
||||
stream: tractor.MsgStream,
|
||||
stream: tractor.ReceiveMsgStream,
|
||||
sequence: list,
|
||||
delay: Optional[float] = None,
|
||||
|
||||
|
@ -214,8 +211,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
|
|||
arb_addr,
|
||||
start_method,
|
||||
):
|
||||
'''
|
||||
Ensure that if a faster task consuming from a stream is cancelled
|
||||
'''Ensure that if a faster task consuming from a stream is cancelled
|
||||
the slower task can continue to receive all expected values.
|
||||
|
||||
'''
|
||||
|
@ -464,51 +460,3 @@ def test_first_recver_is_cancelled():
|
|||
assert value == 1
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
||||
def test_no_raise_on_lag():
|
||||
'''
|
||||
Run a simple 2-task broadcast where one task is slow but configured
|
||||
so that it does not raise `Lagged` on overruns using
|
||||
`raise_on_lasg=False` and verify that the task does not raise.
|
||||
|
||||
'''
|
||||
size = 100
|
||||
tx, rx = trio.open_memory_channel(size)
|
||||
brx = broadcast_receiver(rx, size)
|
||||
|
||||
async def slow():
|
||||
async with brx.subscribe(
|
||||
raise_on_lag=False,
|
||||
) as br:
|
||||
async for msg in br:
|
||||
print(f'slow task got: {msg}')
|
||||
await trio.sleep(0.1)
|
||||
|
||||
async def fast():
|
||||
async with brx.subscribe() as br:
|
||||
async for msg in br:
|
||||
print(f'fast task got: {msg}')
|
||||
|
||||
async def main():
|
||||
async with (
|
||||
tractor.open_root_actor(
|
||||
# NOTE: so we see the warning msg emitted by the bcaster
|
||||
# internals when the no raise flag is set.
|
||||
loglevel='warning',
|
||||
),
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
n.start_soon(slow)
|
||||
n.start_soon(fast)
|
||||
|
||||
for i in range(1000):
|
||||
await tx.send(i)
|
||||
|
||||
# simulate user nailing ctl-c after realizing
|
||||
# there's a lag in the slow task.
|
||||
await trio.sleep(1)
|
||||
raise KeyboardInterrupt
|
||||
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
trio.run(main)
|
||||
|
|
|
@ -24,6 +24,7 @@ from ._clustering import open_actor_cluster
|
|||
from ._ipc import Channel
|
||||
from ._streaming import (
|
||||
Context,
|
||||
ReceiveMsgStream,
|
||||
MsgStream,
|
||||
stream,
|
||||
context,
|
||||
|
@ -44,10 +45,7 @@ from ._exceptions import (
|
|||
ModuleNotExposed,
|
||||
ContextCancelled,
|
||||
)
|
||||
from ._debug import (
|
||||
breakpoint,
|
||||
post_mortem,
|
||||
)
|
||||
from ._debug import breakpoint, post_mortem
|
||||
from . import msg
|
||||
from ._root import (
|
||||
run_daemon,
|
||||
|
@ -66,6 +64,7 @@ __all__ = [
|
|||
'MsgStream',
|
||||
'BaseExceptionGroup',
|
||||
'Portal',
|
||||
'ReceiveMsgStream',
|
||||
'RemoteActorError',
|
||||
'breakpoint',
|
||||
'context',
|
||||
|
|
|
@ -37,7 +37,6 @@ from typing import (
|
|||
)
|
||||
from types import FrameType
|
||||
|
||||
import pdbp
|
||||
import tractor
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
|
@ -54,6 +53,17 @@ from ._exceptions import (
|
|||
)
|
||||
from ._ipc import Channel
|
||||
|
||||
|
||||
try:
|
||||
# wtf: only exported when installed in dev mode?
|
||||
import pdbpp
|
||||
except ImportError:
|
||||
# pdbpp is installed in regular mode...it monkey patches stuff
|
||||
import pdb
|
||||
xpm = getattr(pdb, 'xpm', None)
|
||||
assert xpm, "pdbpp is not installed?" # type: ignore
|
||||
pdbpp = pdb
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
|
@ -144,26 +154,22 @@ class Lock:
|
|||
cls.repl = None
|
||||
|
||||
|
||||
class TractorConfig(pdbp.DefaultConfig):
|
||||
class TractorConfig(pdbpp.DefaultConfig):
|
||||
'''
|
||||
Custom ``pdbp`` goodness :surfer:
|
||||
Custom ``pdbpp`` goodness.
|
||||
|
||||
'''
|
||||
use_pygments: bool = True
|
||||
sticky_by_default: bool = False
|
||||
enable_hidden_frames: bool = False
|
||||
|
||||
# much thanks @mdmintz for the hot tip!
|
||||
# fixes line spacing issue when resizing terminal B)
|
||||
truncate_long_lines: bool = False
|
||||
# use_pygments = True
|
||||
# sticky_by_default = True
|
||||
enable_hidden_frames = False
|
||||
|
||||
|
||||
class MultiActorPdb(pdbp.Pdb):
|
||||
class MultiActorPdb(pdbpp.Pdb):
|
||||
'''
|
||||
Add teardown hooks to the regular ``pdbp.Pdb``.
|
||||
Add teardown hooks to the regular ``pdbpp.Pdb``.
|
||||
|
||||
'''
|
||||
# override the pdbp config with our coolio one
|
||||
# override the pdbpp config with our coolio one
|
||||
DefaultConfig = TractorConfig
|
||||
|
||||
# def preloop(self):
|
||||
|
@ -307,7 +313,7 @@ async def lock_tty_for_child(
|
|||
) -> str:
|
||||
'''
|
||||
Lock the TTY in the root process of an actor tree in a new
|
||||
inter-actor-context-task such that the ``pdbp`` debugger console
|
||||
inter-actor-context-task such that the ``pdbpp`` debugger console
|
||||
can be mutex-allocated to the calling sub-actor for REPL control
|
||||
without interference by other processes / threads.
|
||||
|
||||
|
@ -427,7 +433,7 @@ async def wait_for_parent_stdin_hijack(
|
|||
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
|
||||
|
||||
pdb = MultiActorPdb()
|
||||
# signal.signal = pdbp.hideframe(signal.signal)
|
||||
# signal.signal = pdbpp.hideframe(signal.signal)
|
||||
|
||||
Lock.shield_sigint()
|
||||
|
||||
|
@ -577,7 +583,7 @@ async def _breakpoint(
|
|||
# # frame = sys._getframe()
|
||||
# # last_f = frame.f_back
|
||||
# # last_f.f_globals['__tracebackhide__'] = True
|
||||
# # signal.signal = pdbp.hideframe(signal.signal)
|
||||
# # signal.signal = pdbpp.hideframe(signal.signal)
|
||||
|
||||
|
||||
def shield_sigint_handler(
|
||||
|
@ -737,13 +743,13 @@ def shield_sigint_handler(
|
|||
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
||||
|
||||
# XXX LEGACY: lol, see ``pdbpp`` issue:
|
||||
# XXX: lol, see ``pdbpp`` issue:
|
||||
# https://github.com/pdbpp/pdbpp/issues/496
|
||||
|
||||
|
||||
def _set_trace(
|
||||
actor: tractor.Actor | None = None,
|
||||
pdb: MultiActorPdb | None = None,
|
||||
actor: Optional[tractor.Actor] = None,
|
||||
pdb: Optional[MultiActorPdb] = None,
|
||||
):
|
||||
__tracebackhide__ = True
|
||||
actor = actor or tractor.current_actor()
|
||||
|
@ -753,11 +759,7 @@ def _set_trace(
|
|||
if frame:
|
||||
frame = frame.f_back # type: ignore
|
||||
|
||||
if (
|
||||
frame
|
||||
and pdb
|
||||
and actor is not None
|
||||
):
|
||||
if frame and pdb and actor is not None:
|
||||
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
|
||||
# no f!#$&* idea, but when we're in async land
|
||||
# we need 2x frames up?
|
||||
|
@ -766,8 +768,7 @@ def _set_trace(
|
|||
else:
|
||||
pdb, undo_sigint = mk_mpdb()
|
||||
|
||||
# we entered the global ``breakpoint()`` built-in from sync
|
||||
# code?
|
||||
# we entered the global ``breakpoint()`` built-in from sync code?
|
||||
Lock.local_task_in_debug = 'sync'
|
||||
|
||||
pdb.set_trace(frame=frame)
|
||||
|
@ -797,7 +798,7 @@ def _post_mortem(
|
|||
# https://github.com/pdbpp/pdbpp/issues/480
|
||||
# TODO: help with a 3.10+ major release if/when it arrives.
|
||||
|
||||
pdbp.xpm(Pdb=lambda: pdb)
|
||||
pdbpp.xpm(Pdb=lambda: pdb)
|
||||
|
||||
|
||||
post_mortem = partial(
|
||||
|
|
|
@ -45,10 +45,7 @@ from ._exceptions import (
|
|||
NoResult,
|
||||
ContextCancelled,
|
||||
)
|
||||
from ._streaming import (
|
||||
Context,
|
||||
MsgStream,
|
||||
)
|
||||
from ._streaming import Context, ReceiveMsgStream
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -104,7 +101,7 @@ class Portal:
|
|||
# it is expected that ``result()`` will be awaited at some
|
||||
# point.
|
||||
self._expect_result: Optional[Context] = None
|
||||
self._streams: set[MsgStream] = set()
|
||||
self._streams: set[ReceiveMsgStream] = set()
|
||||
self.actor = current_actor()
|
||||
|
||||
async def _submit_for_result(
|
||||
|
@ -319,7 +316,7 @@ class Portal:
|
|||
async_gen_func: Callable, # typing: ignore
|
||||
**kwargs,
|
||||
|
||||
) -> AsyncGenerator[MsgStream, None]:
|
||||
) -> AsyncGenerator[ReceiveMsgStream, None]:
|
||||
|
||||
if not inspect.isasyncgenfunction(async_gen_func):
|
||||
if not (
|
||||
|
@ -344,7 +341,7 @@ class Portal:
|
|||
|
||||
try:
|
||||
# deliver receive only stream
|
||||
async with MsgStream(
|
||||
async with ReceiveMsgStream(
|
||||
ctx, ctx._recv_chan,
|
||||
) as rchan:
|
||||
self._streams.add(rchan)
|
||||
|
@ -500,10 +497,6 @@ class Portal:
|
|||
f'actor: {uid}'
|
||||
)
|
||||
result = await ctx.result()
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned '
|
||||
f'value from callee `{result}`'
|
||||
)
|
||||
|
||||
# though it should be impossible for any tasks
|
||||
# operating *in* this scope to have survived
|
||||
|
@ -525,6 +518,12 @@ class Portal:
|
|||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
)
|
||||
else:
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned '
|
||||
f'value from callee `{result}`'
|
||||
)
|
||||
|
||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||
# wait for any immediate child in debug before popping the
|
||||
# context from the runtime msg loop otherwise inside
|
||||
|
|
|
@ -22,9 +22,8 @@ from contextlib import asynccontextmanager
|
|||
from functools import partial
|
||||
import importlib
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
import os
|
||||
import signal
|
||||
import typing
|
||||
import warnings
|
||||
|
||||
|
@ -85,10 +84,8 @@ async def open_root_actor(
|
|||
|
||||
'''
|
||||
# Override the global debugger hook to make it play nice with
|
||||
# ``trio``, see much discussion in:
|
||||
# ``trio``, see:
|
||||
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
|
||||
builtin_bp_handler = sys.breakpointhook
|
||||
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
|
||||
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
|
||||
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
|
@ -257,15 +254,6 @@ async def open_root_actor(
|
|||
await actor.cancel()
|
||||
finally:
|
||||
_state._current_actor = None
|
||||
|
||||
# restore breakpoint hook state
|
||||
sys.breakpointhook = builtin_bp_handler
|
||||
if orig_bp_path is not None:
|
||||
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
|
||||
else:
|
||||
# clear env back to having no entry
|
||||
os.environ.pop('PYTHONBREAKPOINT')
|
||||
|
||||
logger.runtime("Root actor terminated")
|
||||
|
||||
|
||||
|
@ -301,7 +289,7 @@ def run_daemon(
|
|||
async def _main():
|
||||
|
||||
async with open_root_actor(
|
||||
registry_addr=registry_addr,
|
||||
arbiter_addr=registry_addr,
|
||||
name=name,
|
||||
start_method=start_method,
|
||||
debug_mode=debug_mode,
|
||||
|
|
|
@ -228,11 +228,11 @@ async def _invoke(
|
|||
|
||||
fname = func.__name__
|
||||
if ctx._cancel_called:
|
||||
msg = f'`{fname}()` cancelled itself'
|
||||
msg = f'{fname} cancelled itself'
|
||||
|
||||
elif cs.cancel_called:
|
||||
msg = (
|
||||
f'`{fname}()` was remotely cancelled by its caller '
|
||||
f'{fname} was remotely cancelled by its caller '
|
||||
f'{ctx.chan.uid}'
|
||||
)
|
||||
|
||||
|
@ -319,7 +319,7 @@ async def _invoke(
|
|||
BrokenPipeError,
|
||||
):
|
||||
# if we can't propagate the error that's a big boo boo
|
||||
log.exception(
|
||||
log.error(
|
||||
f"Failed to ship error to caller @ {chan.uid} !?"
|
||||
)
|
||||
|
||||
|
@ -455,7 +455,7 @@ class Actor:
|
|||
self._mods: dict[str, ModuleType] = {}
|
||||
self.loglevel = loglevel
|
||||
|
||||
self._arb_addr: tuple[str, int] | None = (
|
||||
self._arb_addr = (
|
||||
str(arbiter_addr[0]),
|
||||
int(arbiter_addr[1])
|
||||
) if arbiter_addr else None
|
||||
|
@ -488,10 +488,7 @@ class Actor:
|
|||
self._parent_chan: Optional[Channel] = None
|
||||
self._forkserver_info: Optional[
|
||||
tuple[Any, Any, Any, Any, Any]] = None
|
||||
self._actoruid2nursery: dict[
|
||||
tuple[str, str],
|
||||
ActorNursery | None,
|
||||
] = {} # type: ignore # noqa
|
||||
self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa
|
||||
|
||||
async def wait_for_peer(
|
||||
self, uid: tuple[str, str]
|
||||
|
@ -829,12 +826,7 @@ class Actor:
|
|||
|
||||
if ctx._backpressure:
|
||||
log.warning(text)
|
||||
try:
|
||||
await send_chan.send(msg)
|
||||
except trio.BrokenResourceError:
|
||||
# XXX: local consumer has closed their side
|
||||
# so cancel the far end streaming task
|
||||
log.warning(f"{chan} is already closed")
|
||||
else:
|
||||
try:
|
||||
raise StreamOverrun(text) from None
|
||||
|
@ -1379,12 +1371,10 @@ async def async_main(
|
|||
actor.lifetime_stack.close()
|
||||
|
||||
# Unregister actor from the arbiter
|
||||
if (
|
||||
registered_with_arbiter
|
||||
and not actor.is_arbiter
|
||||
if registered_with_arbiter and (
|
||||
actor._arb_addr is not None
|
||||
):
|
||||
failed = False
|
||||
assert isinstance(actor._arb_addr, tuple)
|
||||
with trio.move_on_after(0.5) as cs:
|
||||
cs.shield = True
|
||||
try:
|
||||
|
|
|
@ -23,12 +23,13 @@ import sys
|
|||
import platform
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Literal,
|
||||
Optional,
|
||||
Callable,
|
||||
TypeVar,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from collections.abc import Awaitable
|
||||
|
||||
from exceptiongroup import BaseExceptionGroup
|
||||
import trio
|
||||
|
@ -59,7 +60,7 @@ if TYPE_CHECKING:
|
|||
log = get_logger('tractor')
|
||||
|
||||
# placeholder for an mp start context if so using that backend
|
||||
_ctx: mp.context.BaseContext | None = None
|
||||
_ctx: Optional[mp.context.BaseContext] = None
|
||||
SpawnMethodKey = Literal[
|
||||
'trio', # supported on all platforms
|
||||
'mp_spawn',
|
||||
|
@ -85,7 +86,7 @@ else:
|
|||
def try_set_start_method(
|
||||
key: SpawnMethodKey
|
||||
|
||||
) -> mp.context.BaseContext | None:
|
||||
) -> Optional[mp.context.BaseContext]:
|
||||
'''
|
||||
Attempt to set the method for process starting, aka the "actor
|
||||
spawning backend".
|
||||
|
@ -199,37 +200,16 @@ async def cancel_on_completion(
|
|||
async def do_hard_kill(
|
||||
proc: trio.Process,
|
||||
terminate_after: int = 3,
|
||||
|
||||
) -> None:
|
||||
# NOTE: this timeout used to do nothing since we were shielding
|
||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||
# never release until the process exits, now it acts as
|
||||
# a hard-kill time ultimatum.
|
||||
log.debug(f"Terminating {proc}")
|
||||
with trio.move_on_after(terminate_after) as cs:
|
||||
|
||||
# NOTE: code below was copied verbatim from the now deprecated
|
||||
# (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc
|
||||
# string:
|
||||
#
|
||||
# Close any pipes we have to the process (both input and output)
|
||||
# and wait for it to exit. If cancelled, kills the process and
|
||||
# waits for it to finish exiting before propagating the
|
||||
# cancellation.
|
||||
with trio.CancelScope(shield=True):
|
||||
if proc.stdin is not None:
|
||||
await proc.stdin.aclose()
|
||||
if proc.stdout is not None:
|
||||
await proc.stdout.aclose()
|
||||
if proc.stderr is not None:
|
||||
await proc.stderr.aclose()
|
||||
try:
|
||||
await proc.wait()
|
||||
finally:
|
||||
if proc.returncode is None:
|
||||
proc.kill()
|
||||
with trio.CancelScope(shield=True):
|
||||
await proc.wait()
|
||||
# NOTE: This ``__aexit__()`` shields internally.
|
||||
async with proc: # calls ``trio.Process.aclose()``
|
||||
log.debug(f"Terminating {proc}")
|
||||
|
||||
if cs.cancelled_caught:
|
||||
# XXX: should pretty much never get here unless we have
|
||||
|
@ -280,9 +260,7 @@ async def soft_wait(
|
|||
|
||||
if proc.poll() is None: # type: ignore
|
||||
log.warning(
|
||||
'Actor still alive after cancel request:\n'
|
||||
f'{uid}'
|
||||
)
|
||||
f'Process still alive after cancel request:\n{uid}')
|
||||
|
||||
n.cancel_scope.cancel()
|
||||
raise
|
||||
|
@ -375,11 +353,12 @@ async def trio_proc(
|
|||
spawn_cmd.append("--asyncio")
|
||||
|
||||
cancelled_during_spawn: bool = False
|
||||
proc: trio.Process | None = None
|
||||
proc: Optional[trio.Process] = None
|
||||
try:
|
||||
try:
|
||||
# TODO: needs ``trio_typing`` patch?
|
||||
proc = await trio.lowlevel.open_process(spawn_cmd)
|
||||
proc = await trio.lowlevel.open_process( # type: ignore
|
||||
spawn_cmd)
|
||||
|
||||
log.runtime(f"Started {proc}")
|
||||
|
||||
|
@ -463,8 +442,8 @@ async def trio_proc(
|
|||
nursery.cancel_scope.cancel()
|
||||
|
||||
finally:
|
||||
# XXX NOTE XXX: The "hard" reap since no actor zombies are
|
||||
# allowed! Do this **after** cancellation/teardown to avoid
|
||||
# The "hard" reap since no actor zombies are allowed!
|
||||
# XXX: do this **after** cancellation/tearfown to avoid
|
||||
# killing the process too early.
|
||||
if proc:
|
||||
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
|
||||
|
|
|
@ -50,13 +50,12 @@ log = get_logger(__name__)
|
|||
# - use __slots__ on ``Context``?
|
||||
|
||||
|
||||
class MsgStream(trio.abc.Channel):
|
||||
class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||
'''
|
||||
A bidirectional message stream for receiving logically sequenced
|
||||
values over an inter-actor IPC ``Channel``.
|
||||
|
||||
This is the type returned to a local task which entered either
|
||||
``Portal.open_stream_from()`` or ``Context.open_stream()``.
|
||||
A IPC message stream for receiving logically sequenced values over
|
||||
an inter-actor ``Channel``. This is the type returned to a local
|
||||
task which entered either ``Portal.open_stream_from()`` or
|
||||
``Context.open_stream()``.
|
||||
|
||||
Termination rules:
|
||||
|
||||
|
@ -98,9 +97,6 @@ class MsgStream(trio.abc.Channel):
|
|||
if self._eoc:
|
||||
raise trio.EndOfChannel
|
||||
|
||||
if self._closed:
|
||||
raise trio.ClosedResourceError('This stream was closed')
|
||||
|
||||
try:
|
||||
msg = await self._rx_chan.receive()
|
||||
return msg['yield']
|
||||
|
@ -114,9 +110,6 @@ class MsgStream(trio.abc.Channel):
|
|||
# - 'error'
|
||||
# possibly just handle msg['stop'] here!
|
||||
|
||||
if self._closed:
|
||||
raise trio.ClosedResourceError('This stream was closed')
|
||||
|
||||
if msg.get('stop') or self._eoc:
|
||||
log.debug(f"{self} was stopped at remote end")
|
||||
|
||||
|
@ -196,6 +189,7 @@ class MsgStream(trio.abc.Channel):
|
|||
return
|
||||
|
||||
self._eoc = True
|
||||
self._closed = True
|
||||
|
||||
# NOTE: this is super subtle IPC messaging stuff:
|
||||
# Relay stop iteration to far end **iff** we're
|
||||
|
@ -212,8 +206,12 @@ class MsgStream(trio.abc.Channel):
|
|||
|
||||
# In the bidirectional case, `Context.open_stream()` will create
|
||||
# the `Actor._cids2qs` entry from a call to
|
||||
# `Actor.get_context()` and will call us here to send the stop
|
||||
# msg in ``__aexit__()`` on teardown.
|
||||
# `Actor.get_context()` and will send the stop message in
|
||||
# ``__aexit__()`` on teardown so it **does not** need to be
|
||||
# called here.
|
||||
if not self._ctx._portal:
|
||||
# Only for 2 way streams can we can send stop from the
|
||||
# caller side.
|
||||
try:
|
||||
# NOTE: if this call is cancelled we expect this end to
|
||||
# handle as though the stop was never sent (though if it
|
||||
|
@ -230,14 +228,7 @@ class MsgStream(trio.abc.Channel):
|
|||
# the underlying channel may already have been pulled
|
||||
# in which case our stop message is meaningless since
|
||||
# it can't traverse the transport.
|
||||
ctx = self._ctx
|
||||
log.warning(
|
||||
f'Stream was already destroyed?\n'
|
||||
f'actor: {ctx.chan.uid}\n'
|
||||
f'ctx id: {ctx.cid}'
|
||||
)
|
||||
|
||||
self._closed = True
|
||||
log.debug(f'Channel for {self} was already closed')
|
||||
|
||||
# Do we close the local mem chan ``self._rx_chan`` ??!?
|
||||
|
||||
|
@ -280,8 +271,7 @@ class MsgStream(trio.abc.Channel):
|
|||
self,
|
||||
|
||||
) -> AsyncIterator[BroadcastReceiver]:
|
||||
'''
|
||||
Allocate and return a ``BroadcastReceiver`` which delegates
|
||||
'''Allocate and return a ``BroadcastReceiver`` which delegates
|
||||
to this message stream.
|
||||
|
||||
This allows multiple local tasks to receive each their own copy
|
||||
|
@ -318,15 +308,15 @@ class MsgStream(trio.abc.Channel):
|
|||
async with self._broadcaster.subscribe() as bstream:
|
||||
assert bstream.key != self._broadcaster.key
|
||||
assert bstream._recv == self._broadcaster._recv
|
||||
|
||||
# NOTE: we patch on a `.send()` to the bcaster so that the
|
||||
# caller can still conduct 2-way streaming using this
|
||||
# ``bstream`` handle transparently as though it was the msg
|
||||
# stream instance.
|
||||
bstream.send = self.send # type: ignore
|
||||
|
||||
yield bstream
|
||||
|
||||
|
||||
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
||||
'''
|
||||
Bidirectional message stream for use within an inter-actor actor
|
||||
``Context```.
|
||||
|
||||
'''
|
||||
async def send(
|
||||
self,
|
||||
data: Any
|
||||
|
@ -603,23 +593,23 @@ class Context:
|
|||
async with MsgStream(
|
||||
ctx=self,
|
||||
rx_chan=ctx._recv_chan,
|
||||
) as stream:
|
||||
) as rchan:
|
||||
|
||||
if self._portal:
|
||||
self._portal._streams.add(stream)
|
||||
self._portal._streams.add(rchan)
|
||||
|
||||
try:
|
||||
self._stream_opened = True
|
||||
|
||||
# XXX: do we need this?
|
||||
# ensure we aren't cancelled before yielding the stream
|
||||
# ensure we aren't cancelled before delivering
|
||||
# the stream
|
||||
# await trio.lowlevel.checkpoint()
|
||||
yield stream
|
||||
yield rchan
|
||||
|
||||
# NOTE: Make the stream "one-shot use". On exit, signal
|
||||
# XXX: Make the stream "one-shot use". On exit, signal
|
||||
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
|
||||
# far end.
|
||||
await stream.aclose()
|
||||
await self.send_stop()
|
||||
|
||||
finally:
|
||||
if self._portal:
|
||||
|
|
|
@ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||
|
||||
# TODO: yay or nay?
|
||||
__tracebackhide__ = True
|
||||
# __tracebackhide__ = True
|
||||
|
||||
# the collection of errors retreived from spawned sub-actors
|
||||
errors: dict[tuple[str, str], BaseException] = {}
|
||||
|
|
|
@ -23,6 +23,7 @@ from __future__ import annotations
|
|||
from abc import abstractmethod
|
||||
from collections import deque
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
from functools import partial
|
||||
from operator import ne
|
||||
from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol
|
||||
|
@ -32,10 +33,7 @@ import trio
|
|||
from trio._core._run import Task
|
||||
from trio.abc import ReceiveChannel
|
||||
from trio.lowlevel import current_task
|
||||
from msgspec import Struct
|
||||
from tractor.log import get_logger
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
# A regular invariant generic type
|
||||
T = TypeVar("T")
|
||||
|
@ -88,7 +86,8 @@ class Lagged(trio.TooSlowError):
|
|||
'''
|
||||
|
||||
|
||||
class BroadcastState(Struct):
|
||||
@dataclass
|
||||
class BroadcastState:
|
||||
'''
|
||||
Common state to all receivers of a broadcast.
|
||||
|
||||
|
@ -111,35 +110,7 @@ class BroadcastState(Struct):
|
|||
eoc: bool = False
|
||||
|
||||
# If the broadcaster was cancelled, we might as well track it
|
||||
cancelled: dict[int, Task] = {}
|
||||
|
||||
def statistics(self) -> dict[str, Any]:
|
||||
'''
|
||||
Return broadcast receiver group "statistics" like many of
|
||||
``trio``'s internal task-sync primitives.
|
||||
|
||||
'''
|
||||
key: int | None
|
||||
ev: trio.Event | None
|
||||
|
||||
subs = self.subs
|
||||
if self.recv_ready is not None:
|
||||
key, ev = self.recv_ready
|
||||
else:
|
||||
key = ev = None
|
||||
|
||||
qlens: dict[int, int] = {}
|
||||
for tid, sz in subs.items():
|
||||
qlens[tid] = sz if sz != -1 else 0
|
||||
|
||||
return {
|
||||
'open_consumers': len(subs),
|
||||
'queued_len_by_task': qlens,
|
||||
'max_buffer_size': self.maxlen,
|
||||
'tasks_waiting': ev.statistics().tasks_waiting if ev else 0,
|
||||
'tasks_cancelled': self.cancelled,
|
||||
'next_value_receiver_id': key,
|
||||
}
|
||||
cancelled: bool = False
|
||||
|
||||
|
||||
class BroadcastReceiver(ReceiveChannel):
|
||||
|
@ -157,40 +128,23 @@ class BroadcastReceiver(ReceiveChannel):
|
|||
rx_chan: AsyncReceiver,
|
||||
state: BroadcastState,
|
||||
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
|
||||
raise_on_lag: bool = True,
|
||||
|
||||
) -> None:
|
||||
|
||||
# register the original underlying (clone)
|
||||
self.key = id(self)
|
||||
self._state = state
|
||||
|
||||
# each consumer has an int count which indicates
|
||||
# which index contains the next value that the task has not yet
|
||||
# consumed and thus should read. In the "up-to-date" case the
|
||||
# consumer task must wait for a new value from the underlying
|
||||
# receiver and we use ``-1`` as the sentinel for this state.
|
||||
state.subs[self.key] = -1
|
||||
|
||||
# underlying for this receiver
|
||||
self._rx = rx_chan
|
||||
self._recv = receive_afunc or rx_chan.receive
|
||||
self._closed: bool = False
|
||||
self._raise_on_lag = raise_on_lag
|
||||
|
||||
def receive_nowait(
|
||||
self,
|
||||
_key: int | None = None,
|
||||
_state: BroadcastState | None = None,
|
||||
async def receive(self) -> ReceiveType:
|
||||
|
||||
) -> Any:
|
||||
'''
|
||||
Sync version of `.receive()` which does all the low level work
|
||||
of receiving from the underlying/wrapped receive channel.
|
||||
|
||||
'''
|
||||
key = _key or self.key
|
||||
state = _state or self._state
|
||||
key = self.key
|
||||
state = self._state
|
||||
|
||||
# TODO: ideally we can make some way to "lock out" the
|
||||
# underlying receive channel in some way such that if some task
|
||||
|
@ -223,47 +177,32 @@ class BroadcastReceiver(ReceiveChannel):
|
|||
# return this value."
|
||||
# https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging
|
||||
|
||||
mxln = state.maxlen
|
||||
lost = seq - mxln
|
||||
|
||||
# decrement to the last value and expect
|
||||
# consumer to either handle the ``Lagged`` and come back
|
||||
# or bail out on its own (thus un-subscribing)
|
||||
state.subs[key] = mxln - 1
|
||||
state.subs[key] = state.maxlen - 1
|
||||
|
||||
# this task was overrun by the producer side
|
||||
task: Task = current_task()
|
||||
msg = f'Task `{task.name}` overrun and dropped `{lost}` values'
|
||||
|
||||
if self._raise_on_lag:
|
||||
raise Lagged(msg)
|
||||
else:
|
||||
log.warning(msg)
|
||||
return self.receive_nowait(_key, _state)
|
||||
raise Lagged(f'Task {task.name} was overrun')
|
||||
|
||||
state.subs[key] -= 1
|
||||
return value
|
||||
|
||||
raise trio.WouldBlock
|
||||
|
||||
async def _receive_from_underlying(
|
||||
self,
|
||||
key: int,
|
||||
state: BroadcastState,
|
||||
|
||||
) -> ReceiveType:
|
||||
# current task already has the latest value **and** is the
|
||||
# first task to begin waiting for a new one
|
||||
if state.recv_ready is None:
|
||||
|
||||
if self._closed:
|
||||
raise trio.ClosedResourceError
|
||||
|
||||
event = trio.Event()
|
||||
assert state.recv_ready is None
|
||||
state.recv_ready = key, event
|
||||
|
||||
try:
|
||||
# if we're cancelled here it should be
|
||||
# fine to bail without affecting any other consumers
|
||||
# right?
|
||||
try:
|
||||
value = await self._recv()
|
||||
|
||||
# items with lower indices are "newer"
|
||||
|
@ -281,6 +220,7 @@ class BroadcastReceiver(ReceiveChannel):
|
|||
# already retreived the last value
|
||||
|
||||
# XXX: which of these impls is fastest?
|
||||
|
||||
# subs = state.subs.copy()
|
||||
# subs.pop(key)
|
||||
|
||||
|
@ -311,85 +251,54 @@ class BroadcastReceiver(ReceiveChannel):
|
|||
# consumers will be awoken with a sequence of -1
|
||||
# and will potentially try to rewait the underlying
|
||||
# receiver instead of just cancelling immediately.
|
||||
self._state.cancelled[key] = current_task()
|
||||
self._state.cancelled = True
|
||||
if event.statistics().tasks_waiting:
|
||||
event.set()
|
||||
raise
|
||||
|
||||
finally:
|
||||
|
||||
# Reset receiver waiter task event for next blocking condition.
|
||||
# this MUST be reset even if the above ``.recv()`` call
|
||||
# was cancelled to avoid the next consumer from blocking on
|
||||
# an event that won't be set!
|
||||
state.recv_ready = None
|
||||
|
||||
async def receive(self) -> ReceiveType:
|
||||
key = self.key
|
||||
state = self._state
|
||||
|
||||
try:
|
||||
return self.receive_nowait(
|
||||
_key=key,
|
||||
_state=state,
|
||||
)
|
||||
except trio.WouldBlock:
|
||||
pass
|
||||
|
||||
# current task already has the latest value **and** is the
|
||||
# first task to begin waiting for a new one so we begin blocking
|
||||
# until rescheduled with the a new value from the underlying.
|
||||
if state.recv_ready is None:
|
||||
return await self._receive_from_underlying(key, state)
|
||||
|
||||
# This task is all caught up and ready to receive the latest
|
||||
# value, so queue/schedule it to be woken on the next internal
|
||||
# event.
|
||||
# value, so queue sched it on the internal event.
|
||||
else:
|
||||
while state.recv_ready is not None:
|
||||
# seq = state.subs[key]
|
||||
# assert seq == -1 # sanity
|
||||
seq = state.subs[key]
|
||||
assert seq == -1 # sanity
|
||||
_, ev = state.recv_ready
|
||||
await ev.wait()
|
||||
try:
|
||||
return self.receive_nowait(
|
||||
_key=key,
|
||||
_state=state,
|
||||
)
|
||||
except trio.WouldBlock:
|
||||
if self._closed:
|
||||
raise trio.ClosedResourceError
|
||||
|
||||
subs = state.subs
|
||||
if (
|
||||
len(subs) == 1
|
||||
and key in subs
|
||||
# or cancelled
|
||||
):
|
||||
# XXX: we are the last and only user of this BR so
|
||||
# likely it makes sense to unwind back to the
|
||||
# underlying?
|
||||
# import tractor
|
||||
# await tractor.breakpoint()
|
||||
log.warning(
|
||||
f'Only one sub left for {self}?\n'
|
||||
'We can probably unwind from breceiver?'
|
||||
)
|
||||
# NOTE: if we ever would like the behaviour where if the
|
||||
# first task to recv on the underlying is cancelled but it
|
||||
# still DOES trigger the ``.recv_ready``, event we'll likely need
|
||||
# this logic:
|
||||
|
||||
if seq > -1:
|
||||
# stuff from above..
|
||||
seq = state.subs[key]
|
||||
|
||||
value = state.queue[seq]
|
||||
state.subs[key] -= 1
|
||||
return value
|
||||
|
||||
elif seq == -1:
|
||||
# XXX: In the case where the first task to allocate the
|
||||
# ``.recv_ready`` event is cancelled we will be woken
|
||||
# with a non-incremented sequence number (the ``-1``
|
||||
# sentinel) and thus will read the oldest value if we
|
||||
# use that. Instead we need to detect if we have not
|
||||
# been incremented and then receive again.
|
||||
# return await self.receive()
|
||||
# ``.recv_ready`` event is cancelled we will be woken with
|
||||
# a non-incremented sequence number and thus will read the
|
||||
# oldest value if we use that. Instead we need to detect if
|
||||
# we have not been incremented and then receive again.
|
||||
return await self.receive()
|
||||
|
||||
return await self._receive_from_underlying(key, state)
|
||||
else:
|
||||
raise ValueError(f'Invalid sequence {seq}!?')
|
||||
|
||||
@asynccontextmanager
|
||||
async def subscribe(
|
||||
self,
|
||||
raise_on_lag: bool = True,
|
||||
|
||||
) -> AsyncIterator[BroadcastReceiver]:
|
||||
'''
|
||||
Subscribe for values from this broadcast receiver.
|
||||
|
@ -407,7 +316,6 @@ class BroadcastReceiver(ReceiveChannel):
|
|||
rx_chan=self._rx,
|
||||
state=state,
|
||||
receive_afunc=self._recv,
|
||||
raise_on_lag=raise_on_lag,
|
||||
)
|
||||
# assert clone in state.subs
|
||||
assert br.key in state.subs
|
||||
|
@ -444,8 +352,7 @@ def broadcast_receiver(
|
|||
|
||||
recv_chan: AsyncReceiver,
|
||||
max_buffer_size: int,
|
||||
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
|
||||
raise_on_lag: bool = True,
|
||||
**kwargs,
|
||||
|
||||
) -> BroadcastReceiver:
|
||||
|
||||
|
@ -456,6 +363,5 @@ def broadcast_receiver(
|
|||
maxlen=max_buffer_size,
|
||||
subs={},
|
||||
),
|
||||
receive_afunc=receive_afunc,
|
||||
raise_on_lag=raise_on_lag,
|
||||
**kwargs,
|
||||
)
|
||||
|
|
|
@ -133,12 +133,12 @@ async def gather_contexts(
|
|||
# deliver control once all managers have started up
|
||||
await all_entered.wait()
|
||||
|
||||
try:
|
||||
# NOTE: order *should* be preserved in the output values
|
||||
# since ``dict``s are now implicitly ordered.
|
||||
yield tuple(unwrapped.values())
|
||||
finally:
|
||||
# NOTE: this is ABSOLUTELY REQUIRED to avoid
|
||||
# the following wacky bug:
|
||||
# <tractorbugurlhere>
|
||||
|
||||
# we don't need a try/finally since cancellation will be triggered
|
||||
# by the surrounding nursery on error.
|
||||
parent_exit.set()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue