Compare commits

..

71 Commits

Author SHA1 Message Date
goodboy e5ee2e3de8
Merge pull request #358 from goodboy/switch_to_pdbp
Switch to `pdbp` 🏄🏼
2023-05-15 09:58:58 -04:00
Tyler Goodlet 41aa91c8eb Add news file 2023-05-15 09:35:59 -04:00
Tyler Goodlet 6758e4487c Drop lingering `pdbpp` comment-refs in tests 2023-05-15 09:14:42 -04:00
Tyler Goodlet 1c3893a383 Drop commented `pdbpp` import logic 2023-05-15 09:01:55 -04:00
Tyler Goodlet 73befac9bc Switch to `pdbp` in test reqs 2023-05-15 09:01:27 -04:00
Tyler Goodlet 79622bbeea Restore `breakpoint()` hook after runtime exits
Previously we were leaking our (pdb++) override into the Python runtime
which would always result in a runtime error whenever `breakpoint()` is
called outside our runtime; after exit of the root actor . This
explicitly restores any previous hook override (detected during startup)
or deletes the hook and restores the environment if none existed prior.

Also adds a new WIP debugging example script to ensure breakpointing
works as normal after runtime close; this will be added to the test
suite.
2023-05-15 00:47:29 -04:00
Tyler Goodlet 95535b2226 Some more 3.10+ optional type sigs 2023-05-15 00:47:29 -04:00
Tyler Goodlet 87c6e09d6b Switch readme links to point @ `pdbp` B) 2023-05-14 22:52:24 -04:00
Tyler Goodlet 9ccd3a74b6 More detailed preface description 2023-05-14 22:38:47 -04:00
Tyler Goodlet ae4ff5dc8d pdbp: adding typing to config settings vars 2023-05-14 22:38:46 -04:00
Tyler Goodlet 705538398f `pdbp`: turn off line truncating by default, fixes terminal resizing stuff 2023-05-14 22:38:16 -04:00
Tyler Goodlet 86aef5238d Hide actor nursery exit frame 2023-05-14 21:24:26 -04:00
Tyler Goodlet cc82447db6 First try: switch debug machinery over to `pdbp` B) 2023-05-14 21:24:26 -04:00
Tyler Goodlet 23cffbd940 Use multiline import for debug mod 2023-05-14 21:24:26 -04:00
Tyler Goodlet 3d202272c4 Change over debugger tests to use `PROMPT` var.. 2023-05-14 21:24:26 -04:00
Tyler Goodlet 63cdb0891f Switch to `pdbp` since noone is maintaining `pdbpp` 2023-05-14 21:24:26 -04:00
goodboy 0f7db27b68
Merge pull request #356 from goodboy/drop_proc_actxmngr
`trio.Process.aclose()`?
2023-05-14 20:59:53 -04:00
Tyler Goodlet c53d62d2f7 Add news file 2023-05-14 20:31:26 -04:00
Tyler Goodlet f667d16d66 Copy the now deprecated `trio.Process.aclose()`
Move it into our `_spawn.do_hard_kill()` since we do indeed rely on
the particular process killing sequence on "soft kill" failure cases.
2023-05-14 19:31:50 -04:00
Tyler Goodlet 24a062341e Just call `trio.Process.aclose()` directly for now? 2023-04-02 14:34:41 -04:00
goodboy e714bec8db
Merge pull request #355 from kehrazy/patch-1
fixed the `Zombie` example having wrong indentation
2023-04-01 12:11:47 -04:00
Igor 009cd6552e
fixed the `Zombie` example having wrong indentation 2023-03-31 17:50:46 +03:00
goodboy 649c5e7504
Merge pull request #343 from goodboy/breceiver_internals
Avoid inf recursion in `BroadcastReceiver.receive()`
2023-01-30 14:01:13 -05:00
Tyler Goodlet 203f95615c Add nooz 2023-01-30 12:42:26 -05:00
Tyler Goodlet efb8bec828 Add a basic no-raise-on lag test 2023-01-30 12:26:07 -05:00
Tyler Goodlet 8637778739 Expose `raise_on_lag: bool` flag through factory 2023-01-30 12:18:23 -05:00
Tyler Goodlet 47166e45f0 Be explicit with passthrough kwargs (there's so few) 2023-01-29 17:31:21 -05:00
Tyler Goodlet 4ce2dcd12b Switch back to raising `Lagged` by default
Makes the broadcast test suite not hang xD, and is our expected default
behaviour. Also removes a ton of commented legacy cruft from before the
refactor to remove the `.receive()` recursion and fixes some typing.

Oh right, and in the case where there's only one subscriber left we warn
log about it since in theory we could actually entirely unwind the
bcaster back to the original underlying, though not sure if that's sane
or works for some use cases (like wanting to have some other subscriber
get added dynamically later).
2023-01-29 15:03:34 -05:00
Tyler Goodlet 80f983818f Ignore monkey patched `.send()` type annot 2023-01-29 15:03:34 -05:00
Tyler Goodlet 6ba29f8d56 Recurse and get the last value when in warn mode 2023-01-29 15:03:34 -05:00
Tyler Goodlet 2707a0e971 Add `._raise_on_lag` flag to disable `Lag` raising 2023-01-29 15:03:34 -05:00
Tyler Goodlet c8efcdd0d3 Drop `ReceiveMsgStream` from test suite 2023-01-29 15:03:34 -05:00
Tyler Goodlet 9f9907271b Merge `ReceiveMsgStream` and `MsgStream`
Since one-way streaming can be accomplished by just *not* sending on one
side (and/or thus wrapping such usage in a more restrictive API), we
just drop the recv-only parent type. The only method different was
`MsgStream.send()`, now merged in. Further in usage of `.subscribe()`
we monkey patch the underlying stream's `.send()` onto the delivered
broadcast receiver so that subscriber tasks can two-way stream as though
using the stream directly.

This allows us to more definitively drop `tractor.open_stream_from()` in
the longer run if we so choose as well; note currently this will
potentially create an issue if a caller tries to `.send()` on such a one
way stream.
2023-01-29 15:03:34 -05:00
Tyler Goodlet c2367c1c5e Better `trio`-ize `BroadcastReceiver` internals
Driven by a bug found in `piker` where we'd get an inf recursion error
due to `BroadcastReceiver.receive()` being called when consumer tasks
are awoken but no value is ready to `.nowait_receive()`.

This new rework takes an approach closer to the interface and internals
of `trio.MemoryReceiveChannel` particularly in terms of,

- implementing a `BroadcastReceiver.receive_nowait()` and using it
  within the async `.receive()`.
- failing over to an internal `._receive_from_underlying()` when the
  `_nowait()` call raises `trio.WouldBlock`.
- adding `BroadcastState.statistics()` for debugging and testing
  dropping recursion from `.receive()`.
2023-01-29 15:03:34 -05:00
goodboy a777217674
Merge pull request #346 from goodboy/ipc_failure_while_streaming
Ipc failure while streaming
2023-01-29 15:02:54 -05:00
Tyler Goodlet 13c9eadc8f Move result log msg up and drop else block 2023-01-29 14:55:02 -05:00
Tyler Goodlet af6c325072 Bump up legacy streaming timeout a smidgen 2023-01-29 14:55:02 -05:00
Tyler Goodlet 195d2f0ed4 Add nooz 2023-01-29 14:55:02 -05:00
Tyler Goodlet aa4871b13d Call `MsgStream.aclose()` in `Context.open_stream.__aexit__()`
We weren't doing this originally I *think* just because of the path
dependent nature of the way the code was developed (originally being
mega pedantic about one-way vs. bidirectional streams) but, it doesn't
seem like there's any issue just calling the stream's `.aclose()`; also
have the benefit of just being less code and logic checks B)
2023-01-29 14:55:02 -05:00
Tyler Goodlet 556f4626db Tweak warning msg for still-alive-after-cancelled actor 2023-01-29 14:55:02 -05:00
Tyler Goodlet 3967c0ed9e Add a simplified zombie lord specific process reaping test 2023-01-29 14:55:02 -05:00
Tyler Goodlet e34823aab4 Add parent vs. child cancels first cases 2023-01-29 14:55:02 -05:00
Tyler Goodlet 6c35ba2cb6 Add IPC breakage on both parent and child side
With the new fancy `_pytest.pathlib.import_path()` we can do real
parametrization of the example-script-module code and thus configure
whether the child, parent, or both silently break the IPC connection.

Parametrize the test for all the above mentioned cases as well as the
case where the IPC never breaks but we still simulate the user hammering
ctl-c / SIGINT to terminate the actor tree. Adjust expected errors based
on each case and heavily document each of these.
2023-01-29 14:55:02 -05:00
Tyler Goodlet 3a0817ff55 Skip `advanced_faults/` subset in docs examples tests 2023-01-29 14:55:02 -05:00
Tyler Goodlet 7fddb4416b Handle `mp` spawn method cases in test suite 2023-01-29 14:55:02 -05:00
Tyler Goodlet 1d92f2552a Adjust other examples tests to expect `pathlib` objects 2023-01-29 14:55:02 -05:00
Tyler Goodlet 4f8586a928 Wrap ex in new test, change dir helpers to use `pathlib.Path` 2023-01-29 14:55:02 -05:00
Tyler Goodlet fb9ff45745 Move example to a new `advanced_faults` egs subset dir 2023-01-29 14:55:02 -05:00
Tyler Goodlet 36a83cb306 Refine example to drop IPC mid-stream
Use a task nursery in the subactor to spawn tasks which cancel the IPC
channel mid stream to simulate the most concurrent case we're likely to
see. Make `main()` accept a `debug_mode: bool` for parametrization. Fill
out detailed comments/docs on this example.
2023-01-29 14:55:02 -05:00
Tyler Goodlet 7394a187e0 Name one-way streaming (con generators) what it is 2023-01-29 14:55:02 -05:00
Tyler Goodlet df01294bb2 Show more functiony syntax in ctx-cancelled log msgs 2023-01-29 14:55:02 -05:00
Tyler Goodlet ddf3d0d1b3 Show tracebacks for un-shipped/propagated errors 2023-01-29 14:55:02 -05:00
Tyler Goodlet 158569adae Add WIP example of silent IPC breaks while streaming 2023-01-29 14:55:02 -05:00
Tyler Goodlet 97d5f7233b Fix uid2nursery lookup table type annot 2023-01-29 14:55:02 -05:00
Tyler Goodlet d27c081a15 Ensure arbiter sockaddr type before usage 2023-01-29 14:55:02 -05:00
Tyler Goodlet a4874a3227 Always set the `parent_exit: trio.Event` on exit 2023-01-29 14:55:02 -05:00
Tyler Goodlet de04bbb2bb Don't raise on a broken IPC-context when sending stop msg 2023-01-29 14:55:02 -05:00
Tyler Goodlet 4f977189c0 Handle broken mem chan on `Actor._push_result()`
When backpressure is used and a feeder mem chan breaks during msg
delivery (usually because the IPC allocating task already terminated)
instead of raising we simply warn as we do for the non-backpressure
case.

Also, add a proper `Actor.is_arbiter` test inside `._invoke()` to avoid
doing an arbiter-registry lookup if the current actor **is** the
registrar.
2023-01-29 14:55:02 -05:00
goodboy 9fd62cf71f
Merge pull request #348 from goodboy/deprecate_arbiter_addr
Begin deprecation of `arbiter_addr` -> `registry_addr`
2023-01-26 16:05:41 -05:00
Tyler Goodlet 606efa5bb7 Adjust daemon command to use new `registry_addr` 2023-01-26 16:00:08 -05:00
Tyler Goodlet 121a8cc891 Drop `Optional` usage from root mod 2023-01-26 16:00:08 -05:00
Tyler Goodlet c54b8ca4ba Begin deprecation of `arbiter_addr` -> `registry_addr` 2023-01-26 16:00:08 -05:00
goodboy de93c8257c
Merge pull request #349 from goodboy/prompt_on_ctrlc
Re-draw `pdbpp` prompt on `SIGINT`
2023-01-26 15:56:37 -05:00
Tyler Goodlet 5b8a87d0f6 Slightly better `xonsh` check hack, fix typing 2023-01-26 15:48:15 -05:00
Tyler Goodlet 9e5c8ce6f6 Add nooz file 2023-01-26 15:39:03 -05:00
Tyler Goodlet 965cd406a2 Use std `pdbpp` release 2023-01-26 15:27:55 -05:00
Tyler Goodlet 2e278ceb74 Add a super hacky check for `xonsh`, smh.. 2023-01-26 15:26:43 -05:00
Tyler Goodlet 6d124db7c9 Never run ctlc-with-intermediary-actor cases locally either 2023-01-26 12:44:13 -05:00
Tyler Goodlet dba8118553 Always attempt prompt redraw on ctl-c in REPL
The stdlib has all sorts of muckery with ignoring SIGINT in the
`Pdb._cmdloop()` but here we just override all that since we don't trust
their decisions about cancellation handling whatsoever. Adds
a `Lock.repl: MultiActorPdb` attr which is set by any task which
acquires root TTY lock indicating (via actor global state) that the
current actor is using the debugger REPL and can be expected to re-draw
the prompt on SIGINT. Further we mask out log messages from any actor
who also has the `shield_sigint_handler()` enabled to avoid logging
noise when debugging.
2023-01-26 12:44:13 -05:00
Tyler Goodlet fca2e7c10e Simplify closed abruptly log msg 2023-01-26 12:44:13 -05:00
Tyler Goodlet 5ed62c5c54 Add note about intermediary-actor in debug issue 2023-01-26 12:44:13 -05:00
30 changed files with 1114 additions and 1456 deletions

View File

@ -67,6 +67,7 @@ jobs:
] ]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v2
@ -84,40 +85,6 @@ jobs:
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
testing-macos:
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
timeout-minutes: 10
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [macos-latest]
python: ['3.10']
spawn_backend: [
'trio',
]
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '${{ matrix.python }}'
- name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: List dependencies
run: pip list
- name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
# We skip 3.10 on windows for now due to not having any collabs to # We skip 3.10 on windows for now due to not having any collabs to
# debug the CI failures. Anyone wanting to hack and solve them is very # debug the CI failures. Anyone wanting to hack and solve them is very
# welcome, but our primary user base is not using that OS. # welcome, but our primary user base is not using that OS.

View File

@ -6,8 +6,14 @@
``tractor`` is a `structured concurrent`_, multi-processing_ runtime ``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_. built on trio_.
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*": Fundamentally, ``tractor`` gives you parallelism via
our nurseries_ let you spawn new Python processes which each run a ``trio`` ``trio``-"*actors*": independent Python processes (aka
non-shared-memory threads) which maintain structured
concurrency (SC) *end-to-end* inside a *supervision tree*.
Cross-process (and thus cross-host) SC is accomplished through the
combined use of our "actor nurseries_" and an "SC-transitive IPC
protocol" constructed on top of multiple Pythons each running a ``trio``
scheduled runtime - a call to ``trio.run()``. scheduled runtime - a call to ``trio.run()``.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_" We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
@ -23,7 +29,8 @@ Features
- **It's just** a ``trio`` API - **It's just** a ``trio`` API
- *Infinitely nesteable* process trees - *Infinitely nesteable* process trees
- Builtin IPC streaming APIs with task fan-out broadcasting - Builtin IPC streaming APIs with task fan-out broadcasting
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_ - A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of
`pdb++`_ thanks to @mdmintz!)
- Support for a swappable, OS specific, process spawning layer - Support for a swappable, OS specific, process spawning layer
- A modular transport stack, allowing for custom serialization (eg. with - A modular transport stack, allowing for custom serialization (eg. with
`msgspec`_), communications protocols, and environment specific IPC `msgspec`_), communications protocols, and environment specific IPC
@ -149,7 +156,7 @@ it **is a bug**.
"Native" multi-process debugging "Native" multi-process debugging
-------------------------------- --------------------------------
Using the magic of `pdb++`_ and our internal IPC, we've Using the magic of `pdbp`_ and our internal IPC, we've
been able to create a native feeling debugging experience for been able to create a native feeling debugging experience for
any (sub-)process in your ``tractor`` tree. any (sub-)process in your ``tractor`` tree.
@ -597,6 +604,7 @@ channel`_!
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general .. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org .. _matrix channel: https://matrix.to/#/!tractor:matrix.org
.. _pdbp: https://github.com/mdmintz/pdbp
.. _pdb++: https://github.com/pdbpp/pdbpp .. _pdb++: https://github.com/pdbpp/pdbpp
.. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops .. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
.. _messages: https://en.wikipedia.org/wiki/Message_passing .. _messages: https://en.wikipedia.org/wiki/Message_passing

View File

@ -0,0 +1,151 @@
'''
Complex edge case where during real-time streaming the IPC tranport
channels are wiped out (purposely in this example though it could have
been an outage) and we want to ensure that despite being in debug mode
(or not) the user can sent SIGINT once they notice the hang and the
actor tree will eventually be cancelled without leaving any zombies.
'''
import trio
from tractor import (
open_nursery,
context,
Context,
MsgStream,
)
async def break_channel_silently_then_error(
stream: MsgStream,
):
async for msg in stream:
await stream.send(msg)
# XXX: close the channel right after an error is raised
# purposely breaking the IPC transport to make sure the parent
# doesn't get stuck in debug or hang on the connection join.
# this more or less simulates an infinite msg-receive hang on
# the other end.
await stream._ctx.chan.send(None)
assert 0
async def close_stream_and_error(
stream: MsgStream,
):
async for msg in stream:
await stream.send(msg)
# wipe out channel right before raising
await stream._ctx.chan.send(None)
await stream.aclose()
assert 0
@context
async def recv_and_spawn_net_killers(
ctx: Context,
break_ipc_after: bool | int = False,
) -> None:
'''
Receive stream msgs and spawn some IPC killers mid-stream.
'''
await ctx.started()
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
):
async for i in stream:
print(f'child echoing {i}')
await stream.send(i)
if (
break_ipc_after
and i > break_ipc_after
):
'#################################\n'
'Simulating child-side IPC BREAK!\n'
'#################################'
n.start_soon(break_channel_silently_then_error, stream)
n.start_soon(close_stream_and_error, stream)
async def main(
debug_mode: bool = False,
start_method: str = 'trio',
# by default we break the parent IPC first (if configured to break
# at all), but this can be changed so the child does first (even if
# both are set to break).
break_parent_ipc_after: int | bool = False,
break_child_ipc_after: int | bool = False,
) -> None:
async with (
open_nursery(
start_method=start_method,
# NOTE: even debugger is used we shouldn't get
# a hang since it never engages due to broken IPC
debug_mode=debug_mode,
loglevel='warning',
) as an,
):
portal = await an.start_actor(
'chitty_hijo',
enable_modules=[__name__],
)
async with portal.open_context(
recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after,
) as (ctx, sent):
async with ctx.open_stream() as stream:
for i in range(1000):
if (
break_parent_ipc_after
and i > break_parent_ipc_after
):
print(
'#################################\n'
'Simulating parent-side IPC BREAK!\n'
'#################################'
)
await stream._ctx.chan.send(None)
# it actually breaks right here in the
# mp_spawn/forkserver backends and thus the zombie
# reaper never even kicks in?
print(f'parent sending {i}')
await stream.send(i)
with trio.move_on_after(2) as cs:
# NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's
# caller/this-parent.
rx = await stream.receive()
print(f"I'm a happy user and echoed to me is {rx}")
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print("YOO i'm a user anddd thingz hangin..")
print(
"YOO i'm mad send side dun but thingz hangin..\n"
'MASHING CTlR-C Ctl-c..'
)
raise KeyboardInterrupt
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,24 @@
import os
import sys
import trio
import tractor
async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
# TODO: an assert that verifies the hook has indeed been, hooked
# XD
assert sys.breakpointhook is not tractor._debug._set_trace
breakpoint()
# TODO: an assert that verifies the hook is unhooked..
assert sys.breakpointhook
breakpoint()
if __name__ == '__main__':
trio.run(main)

View File

@ -12,11 +12,9 @@ async def stream_data(seed):
# this is the third actor; the aggregator # this is the third actor; the aggregator
async def aggregate(seed): async def aggregate(seed):
''' """Ensure that the two streams we receive match but only stream
Ensure that the two streams we receive match but only stream
a single set of values to the parent. a single set of values to the parent.
"""
'''
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
portals = [] portals = []
for i in range(1, 3): for i in range(1, 3):
@ -71,8 +69,7 @@ async def aggregate(seed):
async def main(): async def main():
# a nursery which spawns "actors" # a nursery which spawns "actors"
async with tractor.open_nursery( async with tractor.open_nursery(
arbiter_addr=('127.0.0.1', 1616), arbiter_addr=('127.0.0.1', 1616)
loglevel='cancel',
) as nursery: ) as nursery:
seed = int(1e3) seed = int(1e3)
@ -95,9 +92,6 @@ async def main():
async for value in stream: async for value in stream:
result_stream.append(value) result_stream.append(value)
print("ROOT STREAM CONSUMER COMPLETE")
print("ROOT CANCELLING AGGREGATOR CHILD")
await portal.cancel_actor() await portal.cancel_actor()
print(f"STREAM TIME = {time.time() - start}") print(f"STREAM TIME = {time.time() - start}")

View File

@ -0,0 +1,19 @@
Rework our ``.trionics.BroadcastReceiver`` internals to avoid method
recursion and approach a design and interface closer to ``trio``'s
``MemoryReceiveChannel``.
The details of the internal changes include:
- implementing a ``BroadcastReceiver.receive_nowait()`` and using it
within the async ``.receive()`` thus avoiding recursion from
``.receive()``.
- failing over to an internal ``._receive_from_underlying()`` when the
``_nowait()`` call raises ``trio.WouldBlock``
- adding ``BroadcastState.statistics()`` for debugging and testing both
internals and by users.
- add an internal ``BroadcastReceiver._raise_on_lag: bool`` which can be
set to avoid ``Lagged`` raising for possible use cases where a user
wants to choose between a [cheap or nasty
pattern](https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern)
the the particular stream (we use this in ``piker``'s dark clearing
engine to avoid fast feeds breaking during HFT periods).

View File

@ -0,0 +1,15 @@
Fixes to ensure IPC (channel) breakage doesn't result in hung actor
trees; the zombie reaping and general supervision machinery will always
clean up and terminate.
This includes not only the (mostly minor) fixes to solve these cases but
also a new extensive test suite in `test_advanced_faults.py` with an
accompanying highly configurable example module-script in
`examples/advanced_faults/ipc_failure_during_stream.py`. Tests ensure we
never get hang or zombies despite operating in debug mode and attempt to
simulate all possible IPC transport failure cases for a local-host actor
tree.
Further we simplify `Context.open_stream.__aexit__()` to just call
`MsgStream.aclose()` directly more or less avoiding a pure duplicate
code path.

View File

@ -0,0 +1,10 @@
Always redraw the `pdbpp` prompt on `SIGINT` during REPL use.
There was recent changes todo with Python 3.10 that required us to pin
to a specific commit in `pdbpp` which have recently been fixed minus
this last issue with `SIGINT` shielding: not clobbering or not
showing the `(Pdb++)` prompt on ctlr-c by the user. This repairs all
that by firstly removing the standard KBI intercepting of the std lib's
`pdb.Pdb._cmdloop()` as well as ensuring that only the actor with REPL
control ever reports `SIGINT` handler log msgs and prompt redraws. With
this we move back to using pypi `pdbpp` release.

View File

@ -0,0 +1,7 @@
Drop `trio.Process.aclose()` usage, copy into our spawning code.
The details are laid out in https://github.com/goodboy/tractor/issues/330.
`trio` changed is process running quite some time ago, this just copies
out the small bit we needed (from the old `.aclose()`) for hard kills
where a soft runtime cancel request fails and our "zombie killer"
implementation kicks in.

View File

@ -0,0 +1,15 @@
Switch to using the fork & fix of `pdb++`, `pdbp`:
https://github.com/mdmintz/pdbp
Allows us to sidestep a variety of issues that aren't being maintained
in the upstream project thanks to the hard work of @mdmintz!
We also include some default settings adjustments as per recent
development on the fork:
- sticky mode is still turned on by default but now activates when
a using the `ll` repl command.
- turn off line truncation by default to avoid inter-line gaps when
resizing the terimnal during use.
- when using the backtrace cmd either by `w` or `bt`, the config
automatically switches to non-sticky mode.

View File

@ -1,9 +1,8 @@
pytest pytest
pytest-trio pytest-trio
pytest-timeout pytest-timeout
pdbpp pdbp
mypy mypy
trio_typing trio_typing
pexpect pexpect
towncrier towncrier
numpy

View File

@ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup( setup(
name="tractor", name="tractor",
version='0.1.0a6dev0', # alpha zone version='0.1.0a6dev0', # alpha zone
description='structured concurrrent "actors"', description='structured concurrrent `trio`-"actors"',
long_description=readme, long_description=readme,
license='AGPLv3', license='AGPLv3',
author='Tyler Goodlet', author='Tyler Goodlet',
maintainer='Tyler Goodlet', maintainer='Tyler Goodlet',
maintainer_email='jgbt@protonmail.com', maintainer_email='goodboy_foss@protonmail.com',
url='https://github.com/goodboy/tractor', url='https://github.com/goodboy/tractor',
platforms=['linux', 'windows'], platforms=['linux', 'windows'],
packages=[ packages=[
@ -52,14 +52,15 @@ setup(
# tooling # tooling
'tricycle', 'tricycle',
'trio_typing', 'trio_typing',
# tooling
'colorlog', 'colorlog',
'wrapt', 'wrapt',
# serialization # IPC serialization
'msgspec', 'msgspec',
# debug mode REPL
'pdbp',
# pip ref docs on these specs: # pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
# and pep: # and pep:
@ -70,14 +71,9 @@ setup(
# https://github.com/pdbpp/fancycompleter/issues/37 # https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"', 'pyreadline3 ; platform_system == "Windows"',
# 3.10 has an outstanding unreleased issue and `pdbpp` itself
# pins to patched forks of its own dependencies as well..and
# we need a specific patch on master atm.
'pdbpp @ git+https://github.com/pdbpp/pdbpp@76c4be5#egg=pdbpp ; python_version > "3.9"', # noqa: E501
], ],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.9", python_requires=">=3.10",
keywords=[ keywords=[
'trio', 'trio',
'async', 'async',

View File

@ -7,6 +7,7 @@ import os
import random import random
import signal import signal
import platform import platform
import pathlib
import time import time
import inspect import inspect
from functools import partial, wraps from functools import partial, wraps
@ -113,14 +114,21 @@ no_windows = pytest.mark.skipif(
) )
def repodir(): def repodir() -> pathlib.Path:
"""Return the abspath to the repo directory. '''
""" Return the abspath to the repo directory.
dirname = os.path.dirname
dirpath = os.path.abspath( '''
dirname(dirname(os.path.realpath(__file__))) # 2 parents up to step up through tests/<repo_dir>
) return pathlib.Path(__file__).parent.parent.absolute()
return dirpath
def examples_dir() -> pathlib.Path:
'''
Return the abspath to the examples directory as `pathlib.Path`.
'''
return repodir() / 'examples'
def pytest_addoption(parser): def pytest_addoption(parser):
@ -151,7 +159,7 @@ def loglevel(request):
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def spawn_backend(request): def spawn_backend(request) -> str:
return request.config.option.spawn_backend return request.config.option.spawn_backend
@ -205,16 +213,22 @@ def sig_prog(proc, sig):
@pytest.fixture @pytest.fixture
def daemon(loglevel, testdir, arb_addr): def daemon(
"""Run a daemon actor as a "remote arbiter". loglevel: str,
""" testdir,
arb_addr: tuple[str, int],
):
'''
Run a daemon actor as a "remote arbiter".
'''
if loglevel in ('trace', 'debug'): if loglevel in ('trace', 'debug'):
# too much logging will lock up the subproc (smh) # too much logging will lock up the subproc (smh)
loglevel = 'info' loglevel = 'info'
cmdargs = [ cmdargs = [
sys.executable, '-c', sys.executable, '-c',
"import tractor; tractor.run_daemon([], arbiter_addr={}, loglevel={})" "import tractor; tractor.run_daemon([], registry_addr={}, loglevel={})"
.format( .format(
arb_addr, arb_addr,
"'{}'".format(loglevel) if loglevel else None) "'{}'".format(loglevel) if loglevel else None)

View File

@ -0,0 +1,193 @@
'''
Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la
cancelacion?..
'''
from functools import partial
import pytest
from _pytest.pathlib import import_path
import trio
import tractor
from conftest import (
examples_dir,
)
@pytest.mark.parametrize(
'debug_mode',
[False, True],
ids=['no_debug_mode', 'debug_mode'],
)
@pytest.mark.parametrize(
'ipc_break',
[
# no breaks
{
'break_parent_ipc_after': False,
'break_child_ipc_after': False,
},
# only parent breaks
{
'break_parent_ipc_after': 500,
'break_child_ipc_after': False,
},
# only child breaks
{
'break_parent_ipc_after': False,
'break_child_ipc_after': 500,
},
# both: break parent first
{
'break_parent_ipc_after': 500,
'break_child_ipc_after': 800,
},
# both: break child first
{
'break_parent_ipc_after': 800,
'break_child_ipc_after': 500,
},
],
ids=[
'no_break',
'break_parent',
'break_child',
'break_both_parent_first',
'break_both_child_first',
],
)
def test_ipc_channel_break_during_stream(
debug_mode: bool,
spawn_backend: str,
ipc_break: dict | None,
):
'''
Ensure we can have an IPC channel break its connection during
streaming and it's still possible for the (simulated) user to kill
the actor tree using SIGINT.
We also verify the type of connection error expected in the parent
depending on which side if the IPC breaks first.
'''
if spawn_backend != 'trio':
if debug_mode:
pytest.skip('`debug_mode` only supported on `trio` spawner')
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
expect_final_exc = trio.ClosedResourceError
mod = import_path(
examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py',
root=examples_dir(),
)
expect_final_exc = KeyboardInterrupt
# when ONLY the child breaks we expect the parent to get a closed
# resource error on the next `MsgStream.receive()` and then fail out
# and cancel the child from there.
if (
# only child breaks
(
ipc_break['break_child_ipc_after']
and ipc_break['break_parent_ipc_after'] is False
)
# both break but, parent breaks first
or (
ipc_break['break_child_ipc_after'] is not False
and (
ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after']
)
)
):
expect_final_exc = trio.ClosedResourceError
# when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we expect the channel to be
# sent a stop msg from the child at some point which will signal the
# parent that the stream has been terminated.
# NOTE: when the parent breaks "after" the child you get this same
# case as well, the child breaks the IPC channel with a stop msg
# before any closure takes place.
elif (
# only parent breaks
(
ipc_break['break_parent_ipc_after']
and ipc_break['break_child_ipc_after'] is False
)
# both break but, child breaks first
or (
ipc_break['break_parent_ipc_after'] is not False
and (
ipc_break['break_child_ipc_after']
> ipc_break['break_parent_ipc_after']
)
)
):
expect_final_exc = trio.EndOfChannel
with pytest.raises(expect_final_exc):
trio.run(
partial(
mod.main,
debug_mode=debug_mode,
start_method=spawn_backend,
**ipc_break,
)
)
@tractor.context
async def break_ipc_after_started(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
await stream.aclose()
await trio.sleep(0.2)
await ctx.chan.send(None)
print('child broke IPC and terminating')
def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
'''
Verify that is a subactor's IPC goes down just after bringing up a stream
the parent can trigger a SIGINT and the child will be reaped out-of-IPC by
the localhost process supervision machinery: aka "zombie lord".
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ipc_breaker',
enable_modules=[__name__],
)
with trio.move_on_after(1):
async with (
portal.open_context(
break_ipc_after_started
) as (ctx, sent),
):
async with ctx.open_stream():
await trio.sleep(0.5)
print('parent waiting on context')
print('parent exited context')
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt):
trio.run(main)

View File

@ -14,7 +14,7 @@ def is_win():
return platform.system() == 'Windows' return platform.system() == 'Windows'
_registry: dict[str, set[tractor.ReceiveMsgStream]] = { _registry: dict[str, set[tractor.MsgStream]] = {
'even': set(), 'even': set(),
'odd': set(), 'odd': set(),
} }

View File

@ -14,6 +14,7 @@ import itertools
from os import path from os import path
from typing import Optional from typing import Optional
import platform import platform
import pathlib
import sys import sys
import time import time
@ -24,7 +25,10 @@ from pexpect.exceptions import (
EOF, EOF,
) )
from conftest import repodir, _ci_env from conftest import (
examples_dir,
_ci_env,
)
# TODO: The next great debugger audit could be done by you! # TODO: The next great debugger audit could be done by you!
# - recurrent entry to breakpoint() from single actor *after* and an # - recurrent entry to breakpoint() from single actor *after* and an
@ -36,29 +40,20 @@ from conftest import repodir, _ci_env
# - recurrent root errors # - recurrent root errors
if osname := platform.system() in ( if platform.system() == 'Windows':
'Windows',
'Darwin',
):
pytest.skip( pytest.skip(
'Debugger tests have no {osname} support (yet)', 'Debugger tests have no windows support (yet)',
allow_module_level=True, allow_module_level=True,
) )
def examples_dir():
"""Return the abspath to the examples directory.
"""
return path.join(repodir(), 'examples', 'debugging/')
def mk_cmd(ex_name: str) -> str: def mk_cmd(ex_name: str) -> str:
"""Generate a command suitable to pass to ``pexpect.spawn()``. '''
""" Generate a command suitable to pass to ``pexpect.spawn()``.
return ' '.join(
['python', '''
path.join(examples_dir(), f'{ex_name}.py')] script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
) return ' '.join(['python', str(script_path)])
# TODO: was trying to this xfail style but some weird bug i see in CI # TODO: was trying to this xfail style but some weird bug i see in CI
@ -100,7 +95,7 @@ def spawn(
return _spawn return _spawn
PROMPT = r"\(Pdb\+\+\)" PROMPT = r"\(Pdb\+\)"
def expect( def expect(
@ -156,27 +151,14 @@ def ctlc(
use_ctlc = request.param use_ctlc = request.param
if (
sys.version_info <= (3, 10)
and use_ctlc
):
# on 3.9 it seems the REPL UX
# is highly unreliable and frankly annoying
# to test for. It does work from manual testing
# but i just don't think it's wroth it to try
# and get this working especially since we want to
# be 3.10+ mega-asap.
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
if ci_env:
node = request.node node = request.node
markers = node.own_markers markers = node.own_markers
for mark in markers: for mark in markers:
if mark.name == 'has_nested_actors': if mark.name == 'has_nested_actors':
pytest.skip( pytest.skip(
f'Test for {node} uses nested actors and fails in CI\n' f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test seems to run fine locally but until we solve' f'The test can sometimes run fine locally but until'
'this issue this CI test will be xfail:\n' ' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320' 'https://github.com/goodboy/tractor/issues/320'
) )
@ -199,13 +181,15 @@ def ctlc(
ids=lambda item: f'{item[0]} -> {item[1]}', ids=lambda item: f'{item[0]} -> {item[1]}',
) )
def test_root_actor_error(spawn, user_in_out): def test_root_actor_error(spawn, user_in_out):
"""Demonstrate crash handler entering pdbpp from basic error in root actor. '''
""" Demonstrate crash handler entering pdb from basic error in root actor.
'''
user_input, expect_err_str = user_in_out user_input, expect_err_str = user_in_out
child = spawn('root_actor_error') child = spawn('root_actor_error')
# scan for the pdbpp prompt # scan for the prompt
expect(child, PROMPT) expect(child, PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
@ -236,8 +220,8 @@ def test_root_actor_bp(spawn, user_in_out):
user_input, expect_err_str = user_in_out user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint') child = spawn('root_actor_breakpoint')
# scan for the pdbpp prompt # scan for the prompt
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
assert 'Error' not in str(child.before) assert 'Error' not in str(child.before)
@ -278,7 +262,7 @@ def do_ctlc(
if expect_prompt: if expect_prompt:
before = str(child.before.decode()) before = str(child.before.decode())
time.sleep(delay) time.sleep(delay)
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
time.sleep(delay) time.sleep(delay)
if patt: if patt:
@ -297,7 +281,7 @@ def test_root_actor_bp_forever(
# entries # entries
for _ in range(10): for _ in range(10):
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -307,7 +291,7 @@ def test_root_actor_bp_forever(
# do one continue which should trigger a # do one continue which should trigger a
# new task to lock the tty # new task to lock the tty
child.sendline('continue') child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# seems that if we hit ctrl-c too fast the # seems that if we hit ctrl-c too fast the
# sigint guard machinery might not kick in.. # sigint guard machinery might not kick in..
@ -318,10 +302,10 @@ def test_root_actor_bp_forever(
# XXX: this previously caused a bug! # XXX: this previously caused a bug!
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# quit out of the loop # quit out of the loop
child.sendline('q') child.sendline('q')
@ -344,8 +328,8 @@ def test_subactor_error(
''' '''
child = spawn('subactor_error') child = spawn('subactor_error')
# scan for the pdbpp prompt # scan for the prompt
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "Attaching to pdb in crashed actor: ('name_error'" in before
@ -365,7 +349,7 @@ def test_subactor_error(
# creating actor # creating actor
child.sendline('continue') child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
# root actor gets debugger engaged # root actor gets debugger engaged
@ -392,8 +376,8 @@ def test_subactor_breakpoint(
child = spawn('subactor_breakpoint') child = spawn('subactor_breakpoint')
# scan for the pdbpp prompt # scan for the prompt
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -402,7 +386,7 @@ def test_subactor_breakpoint(
# entries # entries
for _ in range(10): for _ in range(10):
child.sendline('next') child.sendline('next')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -410,7 +394,7 @@ def test_subactor_breakpoint(
# now run some "continues" to show re-entries # now run some "continues" to show re-entries
for _ in range(5): for _ in range(5):
child.sendline('continue') child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -421,7 +405,7 @@ def test_subactor_breakpoint(
child.sendline('q') child.sendline('q')
# child process should exit but parent will capture pdb.BdbQuit # child process should exit but parent will capture pdb.BdbQuit
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "RemoteActorError: ('breakpoint_forever'" in before assert "RemoteActorError: ('breakpoint_forever'" in before
@ -453,8 +437,8 @@ def test_multi_subactors(
''' '''
child = spawn(r'multi_subactors') child = spawn(r'multi_subactors')
# scan for the pdbpp prompt # scan for the prompt
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -466,7 +450,7 @@ def test_multi_subactors(
# entries # entries
for _ in range(10): for _ in range(10):
child.sendline('next') child.sendline('next')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -475,7 +459,7 @@ def test_multi_subactors(
child.sendline('c') child.sendline('c')
# first name_error failure # first name_error failure
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert "NameError" in before assert "NameError" in before
@ -487,7 +471,7 @@ def test_multi_subactors(
child.sendline('c') child.sendline('c')
# 2nd name_error failure # 2nd name_error failure
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# TODO: will we ever get the race where this crash will show up? # TODO: will we ever get the race where this crash will show up?
# blocklist strat now prevents this crash # blocklist strat now prevents this crash
@ -501,7 +485,7 @@ def test_multi_subactors(
# breakpoint loop should re-engage # breakpoint loop should re-engage
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -517,7 +501,7 @@ def test_multi_subactors(
): ):
child.sendline('c') child.sendline('c')
time.sleep(0.1) time.sleep(0.1)
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
if ctlc: if ctlc:
@ -536,11 +520,11 @@ def test_multi_subactors(
# now run some "continues" to show re-entries # now run some "continues" to show re-entries
for _ in range(5): for _ in range(5):
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# quit the loop and expect parent to attach # quit the loop and expect parent to attach
child.sendline('q') child.sendline('q')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert_before(child, [ assert_before(child, [
@ -584,7 +568,7 @@ def test_multi_daemon_subactors(
''' '''
child = spawn('multi_daemon_subactors') child = spawn('multi_daemon_subactors')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# there can be a race for which subactor will acquire # there can be a race for which subactor will acquire
# the root's tty lock first so anticipate either crash # the root's tty lock first so anticipate either crash
@ -614,7 +598,7 @@ def test_multi_daemon_subactors(
# second entry by `bp_forever`. # second entry by `bp_forever`.
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
assert_before(child, [next_msg]) assert_before(child, [next_msg])
# XXX: hooray the root clobbering the child here was fixed! # XXX: hooray the root clobbering the child here was fixed!
@ -636,7 +620,7 @@ def test_multi_daemon_subactors(
# expect another breakpoint actor entry # expect another breakpoint actor entry
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
try: try:
assert_before(child, [bp_forever_msg]) assert_before(child, [bp_forever_msg])
@ -652,7 +636,7 @@ def test_multi_daemon_subactors(
# after 1 or more further bp actor entries. # after 1 or more further bp actor entries.
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
assert_before(child, [name_error_msg]) assert_before(child, [name_error_msg])
# wait for final error in root # wait for final error in root
@ -660,7 +644,7 @@ def test_multi_daemon_subactors(
while True: while True:
try: try:
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
assert_before( assert_before(
child, child,
[bp_forever_msg] [bp_forever_msg]
@ -693,8 +677,8 @@ def test_multi_subactors_root_errors(
''' '''
child = spawn('multi_subactor_root_errors') child = spawn('multi_subactor_root_errors')
# scan for the pdbpp prompt # scan for the prompt
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# at most one subactor should attach before the root is cancelled # at most one subactor should attach before the root is cancelled
before = str(child.before.decode()) before = str(child.before.decode())
@ -709,7 +693,7 @@ def test_multi_subactors_root_errors(
# due to block list strat from #337, this will no longer # due to block list strat from #337, this will no longer
# propagate before the root errors and cancels the spawner sub-tree. # propagate before the root errors and cancels the spawner sub-tree.
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# only if the blocking condition doesn't kick in fast enough # only if the blocking condition doesn't kick in fast enough
before = str(child.before.decode()) before = str(child.before.decode())
@ -724,7 +708,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# check if the spawner crashed or was blocked from debug # check if the spawner crashed or was blocked from debug
# and if this intermediary attached check the boxed error # and if this intermediary attached check the boxed error
@ -741,7 +725,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# expect a root actor crash # expect a root actor crash
assert_before(child, [ assert_before(child, [
@ -786,9 +770,11 @@ def test_multi_nested_subactors_error_through_nurseries(
child = spawn('multi_nested_subactors_error_up_through_nurseries') child = spawn('multi_nested_subactors_error_up_through_nurseries')
timed_out_early: bool = False
for send_char in itertools.cycle(['c', 'q']): for send_char in itertools.cycle(['c', 'q']):
try: try:
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
child.sendline(send_char) child.sendline(send_char)
time.sleep(0.01) time.sleep(0.01)
@ -830,7 +816,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child = spawn('root_cancelled_but_child_is_in_tty_lock') child = spawn('root_cancelled_but_child_is_in_tty_lock')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before assert "NameError: name 'doggypants' is not defined" in before
@ -845,7 +831,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(4): for i in range(4):
time.sleep(0.5) time.sleep(0.5)
try: try:
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
except ( except (
EOF, EOF,
@ -902,7 +888,7 @@ def test_root_cancels_child_context_during_startup(
''' '''
child = spawn('fast_error_in_root_after_spawn') child = spawn('fast_error_in_root_after_spawn')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "AssertionError" in before assert "AssertionError" in before
@ -919,7 +905,7 @@ def test_different_debug_mode_per_actor(
ctlc: bool, ctlc: bool,
): ):
child = spawn('per_actor_debug') child = spawn('per_actor_debug')
child.expect(r"\(Pdb\+\+\)") child.expect(PROMPT)
# only one actor should enter the debugger # only one actor should enter the debugger
before = str(child.before.decode()) before = str(child.before.decode())

View File

@ -12,17 +12,17 @@ import shutil
import pytest import pytest
from conftest import repodir from conftest import (
examples_dir,
)
def examples_dir():
"""Return the abspath to the examples directory.
"""
return os.path.join(repodir(), 'examples')
@pytest.fixture @pytest.fixture
def run_example_in_subproc(loglevel, testdir, arb_addr): def run_example_in_subproc(
loglevel: str,
testdir,
arb_addr: tuple[str, int],
):
@contextmanager @contextmanager
def run(script_code): def run(script_code):
@ -32,8 +32,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
# on windows we need to create a special __main__.py which will # on windows we need to create a special __main__.py which will
# be executed with ``python -m <modulename>`` on windows.. # be executed with ``python -m <modulename>`` on windows..
shutil.copyfile( shutil.copyfile(
os.path.join(examples_dir(), '__main__.py'), examples_dir() / '__main__.py',
os.path.join(str(testdir), '__main__.py') str(testdir / '__main__.py'),
) )
# drop the ``if __name__ == '__main__'`` guard onwards from # drop the ``if __name__ == '__main__'`` guard onwards from
@ -88,21 +88,20 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
and f[0] != '_' and f[0] != '_'
and 'debugging' not in p[0] and 'debugging' not in p[0]
and 'integration' not in p[0] and 'integration' not in p[0]
and 'advanced_faults' not in p[0]
], ],
ids=lambda t: t[1], ids=lambda t: t[1],
) )
def test_example(run_example_in_subproc, example_script): def test_example(run_example_in_subproc, example_script):
''' """Load and run scripts from this repo's ``examples/`` dir as a user
Load and run scripts from this repo's ``examples/`` dir as a user
would copy and pasing them into their editor. would copy and pasing them into their editor.
On windows a little more "finessing" is done to make On windows a little more "finessing" is done to make
``multiprocessing`` play nice: we copy the ``__main__.py`` into the ``multiprocessing`` play nice: we copy the ``__main__.py`` into the
test directory and invoke the script as a module with ``python -m test directory and invoke the script as a module with ``python -m
test_example``. test_example``.
"""
'''
ex_file = os.path.join(*example_script) ex_file = os.path.join(*example_script)
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9): if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
@ -112,20 +111,13 @@ def test_example(run_example_in_subproc, example_script):
code = ex.read() code = ex.read()
with run_example_in_subproc(code) as proc: with run_example_in_subproc(code) as proc:
try: proc.wait()
proc.wait(timeout=5) err, _ = proc.stderr.read(), proc.stdout.read()
finally: # print(f'STDERR: {err}')
err = proc.stderr.read() # print(f'STDOUT: {out}')
errmsg = err.decode()
out = proc.stdout.read()
outmsg = out.decode()
if out:
print(f'STDOUT: {out.decode()}')
# if we get some gnarly output let's aggregate and raise # if we get some gnarly output let's aggregate and raise
if err: if err:
print(f'STDERR:\n{errmsg}')
errmsg = err.decode() errmsg = err.decode()
errlines = errmsg.splitlines() errlines = errmsg.splitlines()
last_error = errlines[-1] last_error = errlines[-1]

View File

@ -251,7 +251,7 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
results, diff = time_quad_ex results, diff = time_quad_ex
assert results assert results
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.666 this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
assert diff < this_fast assert diff < this_fast

View File

@ -1,167 +0,0 @@
"""
Shared mem primitives and APIs.
"""
import uuid
# import numpy
import pytest
import trio
import tractor
from tractor._shm import (
open_shm_list,
attach_shm_list,
)
@tractor.context
async def child_attach_shml_alot(
ctx: tractor.Context,
shm_key: str,
) -> None:
await ctx.started(shm_key)
# now try to attach a boatload of times in a loop..
for _ in range(1000):
shml = attach_shm_list(
key=shm_key,
readonly=False,
)
assert shml.shm.name == shm_key
await trio.sleep(0.001)
def test_child_attaches_alot():
async def main():
async with tractor.open_nursery() as an:
# allocate writeable list in parent
key = f'shml_{uuid.uuid4()}'
shml = open_shm_list(
key=key,
)
portal = await an.start_actor(
'shm_attacher',
enable_modules=[__name__],
)
async with (
portal.open_context(
child_attach_shml_alot,
shm_key=shml.key,
) as (ctx, start_val),
):
assert start_val == key
await ctx.result()
await portal.cancel_actor()
trio.run(main)
@tractor.context
async def child_read_shm_list(
ctx: tractor.Context,
shm_key: str,
use_str: bool,
frame_size: int,
) -> None:
# attach in child
shml = attach_shm_list(
key=shm_key,
# dtype=str if use_str else float,
)
await ctx.started(shml.key)
async with ctx.open_stream() as stream:
async for i in stream:
print(f'(child): reading shm list index: {i}')
if use_str:
expect = str(float(i))
else:
expect = float(i)
if frame_size == 1:
val = shml[i]
assert expect == val
print(f'(child): reading value: {val}')
else:
frame = shml[i - frame_size:i]
print(f'(child): reading frame: {frame}')
@pytest.mark.parametrize(
'use_str',
[False, True],
ids=lambda i: f'use_str_values={i}',
)
@pytest.mark.parametrize(
'frame_size',
[1, 2**6, 2**10],
ids=lambda i: f'frame_size={i}',
)
def test_parent_writer_child_reader(
use_str: bool,
frame_size: int,
):
async def main():
async with tractor.open_nursery(
# debug_mode=True,
) as an:
portal = await an.start_actor(
'shm_reader',
enable_modules=[__name__],
debug_mode=True,
)
# allocate writeable list in parent
key = 'shm_list'
seq_size = int(2 * 2 ** 10)
shml = open_shm_list(
key=key,
size=seq_size,
dtype=str if use_str else float,
readonly=False,
)
async with (
portal.open_context(
child_read_shm_list,
shm_key=key,
use_str=use_str,
frame_size=frame_size,
) as (ctx, sent),
ctx.open_stream() as stream,
):
assert sent == key
for i in range(seq_size):
val = float(i)
if use_str:
val = str(val)
# print(f'(parent): writing {val}')
shml[i] = val
# only on frame fills do we
# signal to the child that a frame's
# worth is ready.
if (i % frame_size) == 0:
print(f'(parent): signalling frame full on {val}')
await stream.send(i)
else:
print(f'(parent): signalling final frame on {val}')
await stream.send(i)
await portal.cancel_actor()
trio.run(main)

View File

@ -12,7 +12,10 @@ import pytest
import trio import trio
from trio.lowlevel import current_task from trio.lowlevel import current_task
import tractor import tractor
from tractor.trionics import broadcast_receiver, Lagged from tractor.trionics import (
broadcast_receiver,
Lagged,
)
@tractor.context @tractor.context
@ -37,7 +40,7 @@ async def echo_sequences(
async def ensure_sequence( async def ensure_sequence(
stream: tractor.ReceiveMsgStream, stream: tractor.MsgStream,
sequence: list, sequence: list,
delay: Optional[float] = None, delay: Optional[float] = None,
@ -211,7 +214,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
arb_addr, arb_addr,
start_method, start_method,
): ):
'''Ensure that if a faster task consuming from a stream is cancelled '''
Ensure that if a faster task consuming from a stream is cancelled
the slower task can continue to receive all expected values. the slower task can continue to receive all expected values.
''' '''
@ -460,3 +464,51 @@ def test_first_recver_is_cancelled():
assert value == 1 assert value == 1
trio.run(main) trio.run(main)
def test_no_raise_on_lag():
'''
Run a simple 2-task broadcast where one task is slow but configured
so that it does not raise `Lagged` on overruns using
`raise_on_lasg=False` and verify that the task does not raise.
'''
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
async def slow():
async with brx.subscribe(
raise_on_lag=False,
) as br:
async for msg in br:
print(f'slow task got: {msg}')
await trio.sleep(0.1)
async def fast():
async with brx.subscribe() as br:
async for msg in br:
print(f'fast task got: {msg}')
async def main():
async with (
tractor.open_root_actor(
# NOTE: so we see the warning msg emitted by the bcaster
# internals when the no raise flag is set.
loglevel='warning',
),
trio.open_nursery() as n,
):
n.start_soon(slow)
n.start_soon(fast)
for i in range(1000):
await tx.send(i)
# simulate user nailing ctl-c after realizing
# there's a lag in the slow task.
await trio.sleep(1)
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt):
trio.run(main)

View File

@ -24,7 +24,6 @@ from ._clustering import open_actor_cluster
from ._ipc import Channel from ._ipc import Channel
from ._streaming import ( from ._streaming import (
Context, Context,
ReceiveMsgStream,
MsgStream, MsgStream,
stream, stream,
context, context,
@ -45,7 +44,10 @@ from ._exceptions import (
ModuleNotExposed, ModuleNotExposed,
ContextCancelled, ContextCancelled,
) )
from ._debug import breakpoint, post_mortem from ._debug import (
breakpoint,
post_mortem,
)
from . import msg from . import msg
from ._root import ( from ._root import (
run_daemon, run_daemon,
@ -64,7 +66,6 @@ __all__ = [
'MsgStream', 'MsgStream',
'BaseExceptionGroup', 'BaseExceptionGroup',
'Portal', 'Portal',
'ReceiveMsgStream',
'RemoteActorError', 'RemoteActorError',
'breakpoint', 'breakpoint',
'context', 'context',

View File

@ -20,9 +20,13 @@ Multi-core debugging for da peeps!
""" """
from __future__ import annotations from __future__ import annotations
import bdb import bdb
import os
import sys import sys
import signal import signal
from functools import partial from functools import (
partial,
cached_property,
)
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from typing import ( from typing import (
Any, Any,
@ -33,6 +37,7 @@ from typing import (
) )
from types import FrameType from types import FrameType
import pdbp
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -49,17 +54,6 @@ from ._exceptions import (
) )
from ._ipc import Channel from ._ipc import Channel
try:
# wtf: only exported when installed in dev mode?
import pdbpp
except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff
import pdb
xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb
log = get_logger(__name__) log = get_logger(__name__)
@ -73,6 +67,7 @@ class Lock:
Mostly to avoid a lot of ``global`` declarations for now XD. Mostly to avoid a lot of ``global`` declarations for now XD.
''' '''
repl: MultiActorPdb | None = None
# placeholder for function to set a ``trio.Event`` on debugger exit # placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None # pdb_release_hook: Optional[Callable] = None
@ -111,7 +106,7 @@ class Lock:
def shield_sigint(cls): def shield_sigint(cls):
cls._orig_sigint_handler = signal.signal( cls._orig_sigint_handler = signal.signal(
signal.SIGINT, signal.SIGINT,
shield_sigint, shield_sigint_handler,
) )
@classmethod @classmethod
@ -146,24 +141,29 @@ class Lock:
finally: finally:
# restore original sigint handler # restore original sigint handler
cls.unshield_sigint() cls.unshield_sigint()
cls.repl = None
class TractorConfig(pdbpp.DefaultConfig): class TractorConfig(pdbp.DefaultConfig):
''' '''
Custom ``pdbpp`` goodness. Custom ``pdbp`` goodness :surfer:
''' '''
# use_pygments = True use_pygments: bool = True
# sticky_by_default = True sticky_by_default: bool = False
enable_hidden_frames = False enable_hidden_frames: bool = False
# much thanks @mdmintz for the hot tip!
# fixes line spacing issue when resizing terminal B)
truncate_long_lines: bool = False
class MultiActorPdb(pdbpp.Pdb): class MultiActorPdb(pdbp.Pdb):
''' '''
Add teardown hooks to the regular ``pdbpp.Pdb``. Add teardown hooks to the regular ``pdbp.Pdb``.
''' '''
# override the pdbpp config with our coolio one # override the pdbp config with our coolio one
DefaultConfig = TractorConfig DefaultConfig = TractorConfig
# def preloop(self): # def preloop(self):
@ -184,6 +184,35 @@ class MultiActorPdb(pdbpp.Pdb):
finally: finally:
Lock.release() Lock.release()
# XXX NOTE: we only override this because apparently the stdlib pdb
# bois likes to touch the SIGINT handler as much as i like to touch
# my d$%&.
def _cmdloop(self):
self.cmdloop()
@cached_property
def shname(self) -> str | None:
'''
Attempt to return the login shell name with a special check for
the infamous `xonsh` since it seems to have some issues much
different from std shells when it comes to flushing the prompt?
'''
# SUPER HACKY and only really works if `xonsh` is not used
# before spawning further sub-shells..
shpath = os.getenv('SHELL', None)
if shpath:
if (
os.getenv('XONSH_LOGIN', default=False)
or 'xonsh' in shpath
):
return 'xonsh'
return os.path.basename(shpath)
return None
@acm @acm
async def _acquire_debug_lock_from_root_task( async def _acquire_debug_lock_from_root_task(
@ -278,7 +307,7 @@ async def lock_tty_for_child(
) -> str: ) -> str:
''' '''
Lock the TTY in the root process of an actor tree in a new Lock the TTY in the root process of an actor tree in a new
inter-actor-context-task such that the ``pdbpp`` debugger console inter-actor-context-task such that the ``pdbp`` debugger console
can be mutex-allocated to the calling sub-actor for REPL control can be mutex-allocated to the calling sub-actor for REPL control
without interference by other processes / threads. without interference by other processes / threads.
@ -388,6 +417,7 @@ async def wait_for_parent_stdin_hijack(
except ContextCancelled: except ContextCancelled:
log.warning('Root actor cancelled debug lock') log.warning('Root actor cancelled debug lock')
raise
finally: finally:
Lock.local_task_in_debug = None Lock.local_task_in_debug = None
@ -397,7 +427,7 @@ async def wait_for_parent_stdin_hijack(
def mk_mpdb() -> tuple[MultiActorPdb, Callable]: def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb = MultiActorPdb() pdb = MultiActorPdb()
# signal.signal = pdbpp.hideframe(signal.signal) # signal.signal = pdbp.hideframe(signal.signal)
Lock.shield_sigint() Lock.shield_sigint()
@ -435,7 +465,10 @@ async def _breakpoint(
# with trio.CancelScope(shield=shield): # with trio.CancelScope(shield=shield):
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
if not Lock.local_pdb_complete or Lock.local_pdb_complete.is_set(): if (
not Lock.local_pdb_complete
or Lock.local_pdb_complete.is_set()
):
Lock.local_pdb_complete = trio.Event() Lock.local_pdb_complete = trio.Event()
# TODO: need a more robust check for the "root" actor # TODO: need a more robust check for the "root" actor
@ -484,6 +517,7 @@ async def _breakpoint(
wait_for_parent_stdin_hijack, wait_for_parent_stdin_hijack,
actor.uid, actor.uid,
) )
Lock.repl = pdb
except RuntimeError: except RuntimeError:
Lock.release() Lock.release()
@ -522,6 +556,7 @@ async def _breakpoint(
Lock.global_actor_in_debug = actor.uid Lock.global_actor_in_debug = actor.uid
Lock.local_task_in_debug = task_name Lock.local_task_in_debug = task_name
Lock.repl = pdb
try: try:
# block here one (at the appropriate frame *up*) where # block here one (at the appropriate frame *up*) where
@ -542,13 +577,13 @@ async def _breakpoint(
# # frame = sys._getframe() # # frame = sys._getframe()
# # last_f = frame.f_back # # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True # # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbpp.hideframe(signal.signal) # # signal.signal = pdbp.hideframe(signal.signal)
def shield_sigint( def shield_sigint_handler(
signum: int, signum: int,
frame: 'frame', # type: ignore # noqa frame: 'frame', # type: ignore # noqa
pdb_obj: Optional[MultiActorPdb] = None, # pdb_obj: Optional[MultiActorPdb] = None,
*args, *args,
) -> None: ) -> None:
@ -565,6 +600,7 @@ def shield_sigint(
uid_in_debug = Lock.global_actor_in_debug uid_in_debug = Lock.global_actor_in_debug
actor = tractor.current_actor() actor = tractor.current_actor()
# print(f'{actor.uid} in HANDLER with ')
def do_cancel(): def do_cancel():
# If we haven't tried to cancel the runtime then do that instead # If we haven't tried to cancel the runtime then do that instead
@ -598,6 +634,9 @@ def shield_sigint(
) )
return do_cancel() return do_cancel()
# only set in the actor actually running the REPL
pdb_obj = Lock.repl
# root actor branch that reports whether or not a child # root actor branch that reports whether or not a child
# has locked debugger. # has locked debugger.
if ( if (
@ -612,10 +651,12 @@ def shield_sigint(
): ):
# we are root and some actor is in debug mode # we are root and some actor is in debug mode
# if uid_in_debug is not None: # if uid_in_debug is not None:
if pdb_obj:
name = uid_in_debug[0] name = uid_in_debug[0]
if name != 'root': if name != 'root':
log.pdb( log.pdb(
f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`" f"Ignoring SIGINT, child in debug mode: `{uid_in_debug}`"
) )
else: else:
@ -625,19 +666,19 @@ def shield_sigint(
elif ( elif (
is_root_process() is_root_process()
): ):
if pdb_obj:
log.pdb( log.pdb(
"Ignoring SIGINT since debug mode is enabled" "Ignoring SIGINT since debug mode is enabled"
) )
# revert back to ``trio`` handler asap!
Lock.unshield_sigint()
if ( if (
Lock._root_local_task_cs_in_debug Lock._root_local_task_cs_in_debug
and not Lock._root_local_task_cs_in_debug.cancel_called and not Lock._root_local_task_cs_in_debug.cancel_called
): ):
Lock._root_local_task_cs_in_debug.cancel() Lock._root_local_task_cs_in_debug.cancel()
# raise KeyboardInterrupt # revert back to ``trio`` handler asap!
Lock.unshield_sigint()
# child actor that has locked the debugger # child actor that has locked the debugger
elif not is_root_process(): elif not is_root_process():
@ -653,7 +694,10 @@ def shield_sigint(
return do_cancel() return do_cancel()
task = Lock.local_task_in_debug task = Lock.local_task_in_debug
if task: if (
task
and pdb_obj
):
log.pdb( log.pdb(
f"Ignoring SIGINT while task in debug mode: `{task}`" f"Ignoring SIGINT while task in debug mode: `{task}`"
) )
@ -668,14 +712,21 @@ def shield_sigint(
raise KeyboardInterrupt raise KeyboardInterrupt
# NOTE: currently (at least on ``fancycompleter`` 0.9.2) # NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it lookks to be that the last command that was run (eg. ll) # it looks to be that the last command that was run (eg. ll)
# will be repeated by default. # will be repeated by default.
# TODO: maybe redraw/print last REPL output to console # maybe redraw/print last REPL output to console since
# we want to alert the user that more input is expect since
# nothing has been done dur to ignoring sigint.
if ( if (
pdb_obj pdb_obj # only when this actor has a REPL engaged
and sys.version_info <= (3, 10)
): ):
# XXX: yah, mega hack, but how else do we catch this madness XD
if pdb_obj.shname == 'xonsh':
pdb_obj.stdout.write(pdb_obj.prompt)
pdb_obj.stdout.flush()
# TODO: make this work like sticky mode where if there is output # TODO: make this work like sticky mode where if there is output
# detected as written to the tty we redraw this part underneath # detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above? # and erase the past draw of this same bit above?
@ -686,21 +737,13 @@ def shield_sigint(
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
# XXX: lol, see ``pdbpp`` issue: # XXX LEGACY: lol, see ``pdbpp`` issue:
# https://github.com/pdbpp/pdbpp/issues/496 # https://github.com/pdbpp/pdbpp/issues/496
# TODO: pretty sure this is what we should expect to have to run
# in total but for now we're just going to wait until `pdbpp`
# figures out it's own stuff on 3.10 (and maybe we'll help).
# pdb_obj.do_longlist(None)
# XXX: we were doing this but it shouldn't be required..
print(pdb_obj.prompt, end='', flush=True)
def _set_trace( def _set_trace(
actor: Optional[tractor.Actor] = None, actor: tractor.Actor | None = None,
pdb: Optional[MultiActorPdb] = None, pdb: MultiActorPdb | None = None,
): ):
__tracebackhide__ = True __tracebackhide__ = True
actor = actor or tractor.current_actor() actor = actor or tractor.current_actor()
@ -710,7 +753,11 @@ def _set_trace(
if frame: if frame:
frame = frame.f_back # type: ignore frame = frame.f_back # type: ignore
if frame and pdb and actor is not None: if (
frame
and pdb
and actor is not None
):
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
# no f!#$&* idea, but when we're in async land # no f!#$&* idea, but when we're in async land
# we need 2x frames up? # we need 2x frames up?
@ -719,7 +766,8 @@ def _set_trace(
else: else:
pdb, undo_sigint = mk_mpdb() pdb, undo_sigint = mk_mpdb()
# we entered the global ``breakpoint()`` built-in from sync code? # we entered the global ``breakpoint()`` built-in from sync
# code?
Lock.local_task_in_debug = 'sync' Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame) pdb.set_trace(frame=frame)
@ -749,7 +797,7 @@ def _post_mortem(
# https://github.com/pdbpp/pdbpp/issues/480 # https://github.com/pdbpp/pdbpp/issues/480
# TODO: help with a 3.10+ major release if/when it arrives. # TODO: help with a 3.10+ major release if/when it arrives.
pdbpp.xpm(Pdb=lambda: pdb) pdbp.xpm(Pdb=lambda: pdb)
post_mortem = partial( post_mortem = partial(
@ -820,7 +868,10 @@ async def maybe_wait_for_debugger(
) -> None: ) -> None:
if not debug_mode() and not child_in_debug: if (
not debug_mode()
and not child_in_debug
):
return return
if ( if (

View File

@ -45,7 +45,10 @@ from ._exceptions import (
NoResult, NoResult,
ContextCancelled, ContextCancelled,
) )
from ._streaming import Context, ReceiveMsgStream from ._streaming import (
Context,
MsgStream,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -101,7 +104,7 @@ class Portal:
# it is expected that ``result()`` will be awaited at some # it is expected that ``result()`` will be awaited at some
# point. # point.
self._expect_result: Optional[Context] = None self._expect_result: Optional[Context] = None
self._streams: set[ReceiveMsgStream] = set() self._streams: set[MsgStream] = set()
self.actor = current_actor() self.actor = current_actor()
async def _submit_for_result( async def _submit_for_result(
@ -316,7 +319,7 @@ class Portal:
async_gen_func: Callable, # typing: ignore async_gen_func: Callable, # typing: ignore
**kwargs, **kwargs,
) -> AsyncGenerator[ReceiveMsgStream, None]: ) -> AsyncGenerator[MsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func): if not inspect.isasyncgenfunction(async_gen_func):
if not ( if not (
@ -341,7 +344,7 @@ class Portal:
try: try:
# deliver receive only stream # deliver receive only stream
async with ReceiveMsgStream( async with MsgStream(
ctx, ctx._recv_chan, ctx, ctx._recv_chan,
) as rchan: ) as rchan:
self._streams.add(rchan) self._streams.add(rchan)
@ -497,6 +500,10 @@ class Portal:
f'actor: {uid}' f'actor: {uid}'
) )
result = await ctx.result() result = await ctx.result()
log.runtime(
f'Context {fn_name} returned '
f'value from callee `{result}`'
)
# though it should be impossible for any tasks # though it should be impossible for any tasks
# operating *in* this scope to have survived # operating *in* this scope to have survived
@ -518,12 +525,6 @@ class Portal:
f'task:{cid}\n' f'task:{cid}\n'
f'actor:{uid}' f'actor:{uid}'
) )
else:
log.runtime(
f'Context {fn_name} returned '
f'value from callee `{result}`'
)
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside # context from the runtime msg loop otherwise inside

View File

@ -22,11 +22,9 @@ from contextlib import asynccontextmanager
from functools import partial from functools import partial
import importlib import importlib
import logging import logging
import os
import signal import signal
from typing import ( import sys
Optional, import os
)
import typing import typing
import warnings import warnings
@ -58,27 +56,28 @@ logger = log.get_logger('tractor')
@asynccontextmanager @asynccontextmanager
async def open_root_actor( async def open_root_actor(
*,
# defaults are above # defaults are above
arbiter_addr: Optional[tuple[str, int]] = ( arbiter_addr: tuple[str, int] | None = None,
_default_arbiter_host,
_default_arbiter_port,
),
name: Optional[str] = 'root', # defaults are above
registry_addr: tuple[str, int] | None = None,
name: str | None = 'root',
# either the `multiprocessing` start method: # either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default). # OR `trio` (the new default).
start_method: Optional[_spawn.SpawnMethodKey] = None, start_method: _spawn.SpawnMethodKey | None = None,
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
# internal logging # internal logging
loglevel: Optional[str] = None, loglevel: str | None = None,
enable_modules: Optional[list] = None, enable_modules: list | None = None,
rpc_module_paths: Optional[list] = None, rpc_module_paths: list | None = None,
) -> typing.Any: ) -> typing.Any:
''' '''
@ -86,8 +85,10 @@ async def open_root_actor(
''' '''
# Override the global debugger hook to make it play nice with # Override the global debugger hook to make it play nice with
# ``trio``, see: # ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
@ -112,10 +113,22 @@ async def open_root_actor(
if start_method is not None: if start_method is not None:
_spawn.try_set_start_method(start_method) _spawn.try_set_start_method(start_method)
arbiter_addr = (host, port) = arbiter_addr or ( if arbiter_addr is not None:
warnings.warn(
'`arbiter_addr` is now deprecated and has been renamed to'
'`registry_addr`.\nUse that instead..',
DeprecationWarning,
stacklevel=2,
)
registry_addr = (host, port) = (
registry_addr
or arbiter_addr
or (
_default_arbiter_host, _default_arbiter_host,
_default_arbiter_port, _default_arbiter_port,
) )
)
loglevel = (loglevel or log._default_loglevel).upper() loglevel = (loglevel or log._default_loglevel).upper()
@ -160,7 +173,7 @@ async def open_root_actor(
except OSError: except OSError:
# TODO: make this a "discovery" log level? # TODO: make this a "discovery" log level?
logger.warning(f"No actor could be found @ {host}:{port}") logger.warning(f"No actor registry found @ {host}:{port}")
# create a local actor and start up its main routine/task # create a local actor and start up its main routine/task
if arbiter_found: if arbiter_found:
@ -170,7 +183,7 @@ async def open_root_actor(
actor = Actor( actor = Actor(
name or 'anonymous', name or 'anonymous',
arbiter_addr=arbiter_addr, arbiter_addr=registry_addr,
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
) )
@ -186,7 +199,7 @@ async def open_root_actor(
actor = Arbiter( actor = Arbiter(
name or 'arbiter', name or 'arbiter',
arbiter_addr=arbiter_addr, arbiter_addr=registry_addr,
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
) )
@ -244,6 +257,15 @@ async def open_root_actor(
await actor.cancel() await actor.cancel()
finally: finally:
_state._current_actor = None _state._current_actor = None
# restore breakpoint hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT')
logger.runtime("Root actor terminated") logger.runtime("Root actor terminated")
@ -251,13 +273,13 @@ def run_daemon(
enable_modules: list[str], enable_modules: list[str],
# runtime kwargs # runtime kwargs
name: Optional[str] = 'root', name: str | None = 'root',
arbiter_addr: tuple[str, int] = ( registry_addr: tuple[str, int] = (
_default_arbiter_host, _default_arbiter_host,
_default_arbiter_port, _default_arbiter_port,
), ),
start_method: Optional[str] = None, start_method: str | None = None,
debug_mode: bool = False, debug_mode: bool = False,
**kwargs **kwargs
@ -279,7 +301,7 @@ def run_daemon(
async def _main(): async def _main():
async with open_root_actor( async with open_root_actor(
arbiter_addr=arbiter_addr, registry_addr=registry_addr,
name=name, name=name,
start_method=start_method, start_method=start_method,
debug_mode=debug_mode, debug_mode=debug_mode,

View File

@ -228,11 +228,11 @@ async def _invoke(
fname = func.__name__ fname = func.__name__
if ctx._cancel_called: if ctx._cancel_called:
msg = f'{fname} cancelled itself' msg = f'`{fname}()` cancelled itself'
elif cs.cancel_called: elif cs.cancel_called:
msg = ( msg = (
f'{fname} was remotely cancelled by its caller ' f'`{fname}()` was remotely cancelled by its caller '
f'{ctx.chan.uid}' f'{ctx.chan.uid}'
) )
@ -319,7 +319,7 @@ async def _invoke(
BrokenPipeError, BrokenPipeError,
): ):
# if we can't propagate the error that's a big boo boo # if we can't propagate the error that's a big boo boo
log.error( log.exception(
f"Failed to ship error to caller @ {chan.uid} !?" f"Failed to ship error to caller @ {chan.uid} !?"
) )
@ -455,7 +455,7 @@ class Actor:
self._mods: dict[str, ModuleType] = {} self._mods: dict[str, ModuleType] = {}
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = ( self._arb_addr: tuple[str, int] | None = (
str(arbiter_addr[0]), str(arbiter_addr[0]),
int(arbiter_addr[1]) int(arbiter_addr[1])
) if arbiter_addr else None ) if arbiter_addr else None
@ -1384,6 +1384,7 @@ async def async_main(
and not actor.is_arbiter and not actor.is_arbiter
): ):
failed = False failed = False
assert isinstance(actor._arb_addr, tuple)
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
try: try:
@ -1608,7 +1609,10 @@ async def process_messages(
# handshake for them (yet) and instead we simply bail out of # handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean # the message loop and expect the teardown sequence to clean
# up. # up.
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') log.runtime(
f'channel from {chan.uid} closed abruptly:\n'
f'-> {chan.raddr}\n'
)
# transport **was** disconnected # transport **was** disconnected
return True return True

View File

@ -1,828 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
SC friendly shared memory management geared at real-time
processing.
Support for ``numpy`` compatible array-buffers is provided but is
considered optional within the context of this runtime-library.
"""
from __future__ import annotations
from sys import byteorder
import time
from typing import Optional
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
)
from msgspec import Struct
import tractor
from .log import get_logger
_USE_POSIX = getattr(shm, '_USE_POSIX', False)
if _USE_POSIX:
from _posixshmem import shm_unlink
try:
import numpy as np
from numpy.lib import recfunctions as rfn
import nptyping
except ImportError:
pass
log = get_logger(__name__)
def disable_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
disable_mantracker()
class SharedInt:
'''
Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter.
'''
def __init__(
self,
shm: SharedMemory,
) -> None:
self._shm = shm
@property
def value(self) -> int:
return int.from_bytes(self._shm.buf, byteorder)
@value.setter
def value(self, value) -> None:
self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder)
def destroy(self) -> None:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
name = self._shm.name
try:
shm_unlink(name)
except FileNotFoundError:
# might be a teardown race here?
log.warning(f'Shm for {name} already unlinked?')
class NDToken(Struct, frozen=True):
'''
Internal represenation of a shared memory ``numpy`` array "token"
which can be used to key and load a system (OS) wide shm entry
and correctly read the array by type signature.
This type is msg safe.
'''
shm_name: str # this servers as a "key" value
shm_first_index_name: str
shm_last_index_name: str
dtype_descr: tuple
size: int # in struct-array index / row terms
# TODO: use nptyping here on dtypes
@property
def dtype(self) -> list[tuple[str, str, tuple[int, ...]]]:
return np.dtype(
list(
map(tuple, self.dtype_descr)
)
).descr
def as_msg(self):
return self.to_dict()
@classmethod
def from_msg(cls, msg: dict) -> NDToken:
if isinstance(msg, NDToken):
return msg
# TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return NDToken(**msg)
# _token_dec = msgspec.msgpack.Decoder(NDToken)
# TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', )
# _known_tokens = trio.RunVar('shms', {})
# TODO: this should maybe be provided via
# a `.trionics.maybe_open_context()` wrapper factory?
# process-local store of keys to tokens
_known_tokens: dict[str, NDToken] = {}
def get_shm_token(key: str) -> NDToken | None:
'''
Convenience func to check if a token
for the provided key is known by this process.
Returns either the ``numpy`` token or a string for a shared list.
'''
return _known_tokens.get(key)
def _make_token(
key: str,
size: int,
dtype: np.dtype,
) -> NDToken:
'''
Create a serializable token that can be used
to access a shared array.
'''
return NDToken(
shm_name=key,
shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last",
dtype_descr=tuple(np.dtype(dtype).descr),
size=size,
)
class ShmArray:
'''
A shared memory ``numpy.ndarray`` API.
An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array
can be read and written to by pushing data both onto the "front"
or "back" of a set index range. The indexes for the "first" and
"last" index are themselves stored in shared memory (accessed via
``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index.
'''
def __init__(
self,
shmarr: np.ndarray,
first: SharedInt,
last: SharedInt,
shm: SharedMemory,
# readonly: bool = True,
) -> None:
self._array = shmarr
# indexes for first and last indices corresponding
# to fille data
self._first = first
self._last = last
self._len = len(shmarr)
self._shm = shm
self._post_init: bool = False
# pushing data does not write the index (aka primary key)
self._write_fields: list[str] | None = None
dtype = shmarr.dtype
if dtype.fields:
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
# TODO: ringbuf api?
@property
def _token(self) -> NDToken:
return NDToken(
shm_name=self._shm.name,
shm_first_index_name=self._first._shm.name,
shm_last_index_name=self._last._shm.name,
dtype_descr=tuple(self._array.dtype.descr),
size=self._len,
)
@property
def token(self) -> dict:
"""Shared memory token that can be serialized and used by
another process to attach to this array.
"""
return self._token.as_msg()
@property
def index(self) -> int:
return self._last.value % self._len
@property
def array(self) -> np.ndarray:
'''
Return an up-to-date ``np.ndarray`` view of the
so-far-written data to the underlying shm buffer.
'''
a = self._array[self._first.value:self._last.value]
# first, last = self._first.value, self._last.value
# a = self._array[first:last]
# TODO: eventually comment this once we've not seen it in the
# wild in a long time..
# XXX: race where first/last indexes cause a reader
# to load an empty array..
if len(a) == 0 and self._post_init:
raise RuntimeError('Empty array race condition hit!?')
# breakpoint()
return a
def ustruct(
self,
fields: Optional[list[str]] = None,
# type that all field values will be cast to in the returned
# view.
common_dtype: np.dtype = np.float64, # type: ignore
) -> np.ndarray:
array = self._array
if fields:
selection = array[fields]
# fcount = len(fields)
else:
selection = array
# fcount = len(array.dtype.fields)
# XXX: manual ``.view()`` attempt that also doesn't work.
# uview = selection.view(
# dtype='<f16',
# ).reshape(-1, 4, order='A')
# assert len(selection) == len(uview)
u = rfn.structured_to_unstructured(
selection,
# dtype=float,
copy=True,
)
# unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf)
# array[:] = a[:]
return u
# return ShmArray(
# shmarr=u,
# first=self._first,
# last=self._last,
# shm=self._shm
# )
def last(
self,
length: int = 1,
) -> np.ndarray:
'''
Return the last ``length``'s worth of ("row") entries from the
array.
'''
return self.array[-length:]
def push(
self,
data: np.ndarray,
field_map: Optional[dict[str, str]] = None,
prepend: bool = False,
update_first: bool = True,
start: Optional[int] = None,
) -> int:
'''
Ring buffer like "push" to append data
into the buffer and return updated "last" index.
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
length = len(data)
if prepend:
index = (start or self._first.value) - length
if index < 0:
raise ValueError(
f'Array size of {self._len} was overrun during prepend.\n'
f'You have passed {abs(index)} too many datums.'
)
else:
index = start if start is not None else self._last.value
end = index + length
if field_map:
src_names, dst_names = zip(*field_map.items())
else:
dst_names = src_names = self._write_fields
try:
self._array[
list(dst_names)
][index:end] = data[list(src_names)][:]
# NOTE: there was a race here between updating
# the first and last indices and when the next reader
# tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder.
if prepend and update_first and length:
assert index < self._first.value
if (
index < self._first.value
and update_first
):
assert prepend, 'prepend=True not passed but index decreased?'
self._first.value = index
elif not prepend:
self._last.value = end
self._post_init = True
return end
except ValueError as err:
if field_map:
raise
# should raise if diff detected
self.diff_err_fields(data)
raise err
def diff_err_fields(
self,
data: np.ndarray,
) -> None:
# reraise with any field discrepancy
our_fields, their_fields = (
set(self._array.dtype.fields),
set(data.dtype.fields),
)
only_in_ours = our_fields - their_fields
only_in_theirs = their_fields - our_fields
if only_in_ours:
raise TypeError(
f"Input array is missing field(s): {only_in_ours}"
)
elif only_in_theirs:
raise TypeError(
f"Input array has unknown field(s): {only_in_theirs}"
)
# TODO: support "silent" prepends that don't update ._first.value?
def prepend(
self,
data: np.ndarray,
) -> int:
end = self.push(data, prepend=True)
assert end
def close(self) -> None:
self._first._shm.close()
self._last._shm.close()
self._shm.close()
def destroy(self) -> None:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
self._first.destroy()
self._last.destroy()
def flush(self) -> None:
# TODO: flush to storage backend like markestore?
...
def open_shm_ndarray(
key: Optional[str] = None,
size: int = int(2 ** 10),
dtype: np.dtype | None = None,
append_start_index: int = 0,
readonly: bool = False,
) -> ShmArray:
'''
Open a memory shared ``numpy`` using the standard library.
This call unlinks (aka permanently destroys) the buffer on teardown
and thus should be used from the parent-most accessor (process).
'''
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)
a['index'] = np.arange(len(a))
shm = SharedMemory(
name=key,
create=True,
size=a.nbytes
)
array = np.ndarray(
a.shape,
dtype=a.dtype,
buffer=shm.buf
)
array[:] = a[:]
array.setflags(write=int(not readonly))
token = _make_token(
key=key,
size=size,
dtype=dtype,
)
# create single entry arrays for storing an first and last indices
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=True,
size=4, # std int
)
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=True,
size=4, # std int
)
)
# Start the "real-time" append-updated (or "pushed-to") section
# after some start index: ``append_start_index``. This allows appending
# from a start point in the array which isn't the 0 index and looks
# something like,
# -------------------------
# | | i
# _________________________
# <-------------> <------->
# history real-time
#
# Once fully "prepended", the history section will leave the
# ``ShmArray._start.value: int = 0`` and the yet-to-be written
# real-time section will start at ``ShmArray.index: int``.
# this sets the index to 3/4 of the length of the buffer
# leaving a "days worth of second samples" for the real-time
# section.
last.value = first.value = append_start_index
shmarr = ShmArray(
array,
first,
last,
shm,
)
assert shmarr._token == token
_known_tokens[key] = shmarr.token
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
stack = tractor.current_actor().lifetime_stack
stack.callback(shmarr.close)
stack.callback(shmarr.destroy)
return shmarr
def attach_shm_ndarray(
token: tuple[str, str, tuple[str, str]],
readonly: bool = True,
) -> ShmArray:
'''
Attach to an existing shared memory array previously
created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write
access are constructed.
'''
token = NDToken.from_msg(token)
key = token.shm_name
if key in _known_tokens:
assert NDToken.from_msg(_known_tokens[key]) == token, "WTF"
# XXX: ugh, looks like due to the ``shm_open()`` C api we can't
# actually place files in a subdir, see discussion here:
# https://stackoverflow.com/a/11103289
# attach to array buffer and view as per dtype
_err: Optional[Exception] = None
for _ in range(3):
try:
shm = SharedMemory(
name=key,
create=False,
)
break
except OSError as oserr:
_err = oserr
time.sleep(0.1)
else:
if _err:
raise _err
shmarr = np.ndarray(
(token.size,),
dtype=token.dtype,
buffer=shm.buf
)
shmarr.setflags(write=int(not readonly))
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=False,
size=4, # std int
),
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=False,
size=4, # std int
),
)
# make sure we can read
first.value
sha = ShmArray(
shmarr,
first,
last,
shm,
)
# read test
sha.array
# Stash key -> token knowledge for future queries
# via `maybe_opepn_shm_array()` but only after we know
# we can attach.
if key not in _known_tokens:
_known_tokens[key] = token
# "close" attached shm on actor teardown
tractor.current_actor().lifetime_stack.callback(sha.close)
return sha
def maybe_open_shm_ndarray(
key: str, # unique identifier for segment
# from ``open_shm_array()``
size: int = int(2 ** 10), # array length in index terms
dtype: np.dtype | None = None,
append_start_index: int = 0,
readonly: bool = True,
) -> tuple[ShmArray, bool]:
'''
Attempt to attach to a shared memory block using a "key" lookup
to registered blocks in the users overall "system" registry
(presumes you don't have the block's explicit token).
This function is meant to solve the problem of discovering whether
a shared array token has been allocated or discovered by the actor
running in **this** process. Systems where multiple actors may seek
to access a common block can use this function to attempt to acquire
a token as discovered by the actors who have previously stored
a "key" -> ``NDToken`` map in an actor local (aka python global)
variable.
If you know the explicit ``NDToken`` for your memory segment instead
use ``attach_shm_array``.
'''
try:
# see if we already know this key
token = _known_tokens[key]
return (
attach_shm_ndarray(
token=token,
readonly=readonly,
),
False, # not newly opened
)
except KeyError:
log.warning(f"Could not find {key} in shms cache")
if dtype:
token = _make_token(
key,
size=size,
dtype=dtype,
)
else:
try:
return (
attach_shm_ndarray(
token=token,
readonly=readonly,
),
False,
)
except FileNotFoundError:
log.warning(f"Could not attach to shm with token {token}")
# This actor does not know about memory
# associated with the provided "key".
# Attempt to open a block and expect
# to fail if a block has been allocated
# on the OS by someone else.
return (
open_shm_ndarray(
key=key,
size=size,
dtype=dtype,
append_start_index=append_start_index,
readonly=readonly,
),
True,
)
class ShmList(ShareableList):
'''
Carbon copy of ``.shared_memory.ShareableList`` with a few
enhancements:
- readonly mode via instance var flag `._readonly: bool`
- ``.__getitem__()`` accepts ``slice`` inputs
- exposes the underlying buffer "name" as a ``.key: str``
'''
def __init__(
self,
sequence: list | None = None,
*,
name: str | None = None,
readonly: bool = True
) -> None:
self._readonly = readonly
self._key = name
return super().__init__(
sequence=sequence,
name=name,
)
@property
def key(self) -> str:
return self._key
@property
def readonly(self) -> bool:
return self._readonly
def __setitem__(
self,
position,
value,
) -> None:
# mimick ``numpy`` error
if self._readonly:
raise ValueError('assignment destination is read-only')
return super().__setitem__(position, value)
def __getitem__(
self,
indexish,
) -> list:
# NOTE: this is a non-writeable view (copy?) of the buffer
# in a new list instance.
if isinstance(indexish, slice):
return list(self)[indexish]
return super().__getitem__(indexish)
# TODO: should we offer a `.array` and `.push()` equivalent
# to the `ShmArray`?
# currently we have the following limitations:
# - can't write slices of input using traditional slice-assign
# syntax due to the ``ShareableList.__setitem__()`` implementation.
# - ``list(shmlist)`` returns a non-mutable copy instead of
# a writeable view which would be handier numpy-style ops.
def open_shm_list(
key: str,
sequence: list | None = None,
size: int = int(2 ** 10),
dtype: float | int | bool | str | bytes | None = float,
readonly: bool = True,
) -> ShmList:
if sequence is None:
default = {
float: 0.,
int: 0,
bool: True,
str: 'doggy',
None: None,
}[dtype]
sequence = [default] * size
shml = ShmList(
sequence=sequence,
name=key,
readonly=readonly,
)
# "close" attached shm on actor teardown
try:
actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close)
actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps')
return shml
def attach_shm_list(
key: str,
readonly: bool = False,
) -> ShmList:
return ShmList(
name=key,
readonly=readonly,
)

View File

@ -23,13 +23,12 @@ import sys
import platform import platform
from typing import ( from typing import (
Any, Any,
Awaitable,
Literal, Literal,
Optional,
Callable, Callable,
TypeVar, TypeVar,
TYPE_CHECKING, TYPE_CHECKING,
) )
from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
import trio import trio
@ -60,7 +59,7 @@ if TYPE_CHECKING:
log = get_logger('tractor') log = get_logger('tractor')
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None _ctx: mp.context.BaseContext | None = None
SpawnMethodKey = Literal[ SpawnMethodKey = Literal[
'trio', # supported on all platforms 'trio', # supported on all platforms
'mp_spawn', 'mp_spawn',
@ -86,7 +85,7 @@ else:
def try_set_start_method( def try_set_start_method(
key: SpawnMethodKey key: SpawnMethodKey
) -> Optional[mp.context.BaseContext]: ) -> mp.context.BaseContext | None:
''' '''
Attempt to set the method for process starting, aka the "actor Attempt to set the method for process starting, aka the "actor
spawning backend". spawning backend".
@ -200,16 +199,37 @@ async def cancel_on_completion(
async def do_hard_kill( async def do_hard_kill(
proc: trio.Process, proc: trio.Process,
terminate_after: int = 3, terminate_after: int = 3,
) -> None: ) -> None:
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as # never release until the process exits, now it acts as
# a hard-kill time ultimatum. # a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs: with trio.move_on_after(terminate_after) as cs:
# NOTE: This ``__aexit__()`` shields internally. # NOTE: code below was copied verbatim from the now deprecated
async with proc: # calls ``trio.Process.aclose()`` # (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc
log.debug(f"Terminating {proc}") # string:
#
# Close any pipes we have to the process (both input and output)
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
if proc.stdout is not None:
await proc.stdout.aclose()
if proc.stderr is not None:
await proc.stderr.aclose()
try:
await proc.wait()
finally:
if proc.returncode is None:
proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have # XXX: should pretty much never get here unless we have
@ -260,7 +280,9 @@ async def soft_wait(
if proc.poll() is None: # type: ignore if proc.poll() is None: # type: ignore
log.warning( log.warning(
f'Process still alive after cancel request:\n{uid}') 'Actor still alive after cancel request:\n'
f'{uid}'
)
n.cancel_scope.cancel() n.cancel_scope.cancel()
raise raise
@ -353,12 +375,11 @@ async def trio_proc(
spawn_cmd.append("--asyncio") spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False cancelled_during_spawn: bool = False
proc: Optional[trio.Process] = None proc: trio.Process | None = None
try: try:
try: try:
# TODO: needs ``trio_typing`` patch? # TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process( # type: ignore proc = await trio.lowlevel.open_process(spawn_cmd)
spawn_cmd)
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
@ -442,8 +463,8 @@ async def trio_proc(
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
finally: finally:
# The "hard" reap since no actor zombies are allowed! # XXX NOTE XXX: The "hard" reap since no actor zombies are
# XXX: do this **after** cancellation/tearfown to avoid # allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early. # killing the process too early.
if proc: if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}') log.cancel(f'Hard reap sequence starting for {subactor.uid}')
@ -457,6 +478,13 @@ async def trio_proc(
await proc.wait() await proc.wait()
if is_root_process(): if is_root_process():
# TODO: solve the following issue where we need
# to do a similar wait like this but in an
# "intermediary" parent actor that itself isn't
# in debug but has a child that is, and we need
# to hold off on relaying SIGINT until that child
# is complete.
# https://github.com/goodboy/tractor/issues/320
await maybe_wait_for_debugger( await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get( child_in_debug=_runtime_vars.get(
'_debug_mode', False), '_debug_mode', False),

View File

@ -50,12 +50,13 @@ log = get_logger(__name__)
# - use __slots__ on ``Context``? # - use __slots__ on ``Context``?
class ReceiveMsgStream(trio.abc.ReceiveChannel): class MsgStream(trio.abc.Channel):
''' '''
A IPC message stream for receiving logically sequenced values over A bidirectional message stream for receiving logically sequenced
an inter-actor ``Channel``. This is the type returned to a local values over an inter-actor IPC ``Channel``.
task which entered either ``Portal.open_stream_from()`` or
``Context.open_stream()``. This is the type returned to a local task which entered either
``Portal.open_stream_from()`` or ``Context.open_stream()``.
Termination rules: Termination rules:
@ -97,6 +98,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
if self._eoc: if self._eoc:
raise trio.EndOfChannel raise trio.EndOfChannel
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
try: try:
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
@ -110,6 +114,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# - 'error' # - 'error'
# possibly just handle msg['stop'] here! # possibly just handle msg['stop'] here!
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or self._eoc: if msg.get('stop') or self._eoc:
log.debug(f"{self} was stopped at remote end") log.debug(f"{self} was stopped at remote end")
@ -189,7 +196,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
return return
self._eoc = True self._eoc = True
self._closed = True
# NOTE: this is super subtle IPC messaging stuff: # NOTE: this is super subtle IPC messaging stuff:
# Relay stop iteration to far end **iff** we're # Relay stop iteration to far end **iff** we're
@ -206,12 +212,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# In the bidirectional case, `Context.open_stream()` will create # In the bidirectional case, `Context.open_stream()` will create
# the `Actor._cids2qs` entry from a call to # the `Actor._cids2qs` entry from a call to
# `Actor.get_context()` and will send the stop message in # `Actor.get_context()` and will call us here to send the stop
# ``__aexit__()`` on teardown so it **does not** need to be # msg in ``__aexit__()`` on teardown.
# called here.
if not self._ctx._portal:
# Only for 2 way streams can we can send stop from the
# caller side.
try: try:
# NOTE: if this call is cancelled we expect this end to # NOTE: if this call is cancelled we expect this end to
# handle as though the stop was never sent (though if it # handle as though the stop was never sent (though if it
@ -228,7 +230,14 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# the underlying channel may already have been pulled # the underlying channel may already have been pulled
# in which case our stop message is meaningless since # in which case our stop message is meaningless since
# it can't traverse the transport. # it can't traverse the transport.
log.debug(f'Channel for {self} was already closed') ctx = self._ctx
log.warning(
f'Stream was already destroyed?\n'
f'actor: {ctx.chan.uid}\n'
f'ctx id: {ctx.cid}'
)
self._closed = True
# Do we close the local mem chan ``self._rx_chan`` ??!? # Do we close the local mem chan ``self._rx_chan`` ??!?
@ -271,7 +280,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self, self,
) -> AsyncIterator[BroadcastReceiver]: ) -> AsyncIterator[BroadcastReceiver]:
'''Allocate and return a ``BroadcastReceiver`` which delegates '''
Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream. to this message stream.
This allows multiple local tasks to receive each their own copy This allows multiple local tasks to receive each their own copy
@ -308,15 +318,15 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
async with self._broadcaster.subscribe() as bstream: async with self._broadcaster.subscribe() as bstream:
assert bstream.key != self._broadcaster.key assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv assert bstream._recv == self._broadcaster._recv
# NOTE: we patch on a `.send()` to the bcaster so that the
# caller can still conduct 2-way streaming using this
# ``bstream`` handle transparently as though it was the msg
# stream instance.
bstream.send = self.send # type: ignore
yield bstream yield bstream
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
'''
Bidirectional message stream for use within an inter-actor actor
``Context```.
'''
async def send( async def send(
self, self,
data: Any data: Any
@ -593,23 +603,23 @@ class Context:
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=ctx._recv_chan, rx_chan=ctx._recv_chan,
) as rchan: ) as stream:
if self._portal: if self._portal:
self._portal._streams.add(rchan) self._portal._streams.add(stream)
try: try:
self._stream_opened = True self._stream_opened = True
# ensure we aren't cancelled before delivering # XXX: do we need this?
# the stream # ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield rchan yield stream
# XXX: Make the stream "one-shot use". On exit, signal # NOTE: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the # ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end. # far end.
await self.send_stop() await stream.aclose()
finally: finally:
if self._portal: if self._portal:

View File

@ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]: ) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay? # TODO: yay or nay?
# __tracebackhide__ = True __tracebackhide__ = True
# the collection of errors retreived from spawned sub-actors # the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {} errors: dict[tuple[str, str], BaseException] = {}

View File

@ -23,7 +23,6 @@ from __future__ import annotations
from abc import abstractmethod from abc import abstractmethod
from collections import deque from collections import deque
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import partial from functools import partial
from operator import ne from operator import ne
from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol
@ -33,7 +32,10 @@ import trio
from trio._core._run import Task from trio._core._run import Task
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio.lowlevel import current_task from trio.lowlevel import current_task
from msgspec import Struct
from tractor.log import get_logger
log = get_logger(__name__)
# A regular invariant generic type # A regular invariant generic type
T = TypeVar("T") T = TypeVar("T")
@ -86,8 +88,7 @@ class Lagged(trio.TooSlowError):
''' '''
@dataclass class BroadcastState(Struct):
class BroadcastState:
''' '''
Common state to all receivers of a broadcast. Common state to all receivers of a broadcast.
@ -110,7 +111,35 @@ class BroadcastState:
eoc: bool = False eoc: bool = False
# If the broadcaster was cancelled, we might as well track it # If the broadcaster was cancelled, we might as well track it
cancelled: bool = False cancelled: dict[int, Task] = {}
def statistics(self) -> dict[str, Any]:
'''
Return broadcast receiver group "statistics" like many of
``trio``'s internal task-sync primitives.
'''
key: int | None
ev: trio.Event | None
subs = self.subs
if self.recv_ready is not None:
key, ev = self.recv_ready
else:
key = ev = None
qlens: dict[int, int] = {}
for tid, sz in subs.items():
qlens[tid] = sz if sz != -1 else 0
return {
'open_consumers': len(subs),
'queued_len_by_task': qlens,
'max_buffer_size': self.maxlen,
'tasks_waiting': ev.statistics().tasks_waiting if ev else 0,
'tasks_cancelled': self.cancelled,
'next_value_receiver_id': key,
}
class BroadcastReceiver(ReceiveChannel): class BroadcastReceiver(ReceiveChannel):
@ -128,23 +157,40 @@ class BroadcastReceiver(ReceiveChannel):
rx_chan: AsyncReceiver, rx_chan: AsyncReceiver,
state: BroadcastState, state: BroadcastState,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = True,
) -> None: ) -> None:
# register the original underlying (clone) # register the original underlying (clone)
self.key = id(self) self.key = id(self)
self._state = state self._state = state
# each consumer has an int count which indicates
# which index contains the next value that the task has not yet
# consumed and thus should read. In the "up-to-date" case the
# consumer task must wait for a new value from the underlying
# receiver and we use ``-1`` as the sentinel for this state.
state.subs[self.key] = -1 state.subs[self.key] = -1
# underlying for this receiver # underlying for this receiver
self._rx = rx_chan self._rx = rx_chan
self._recv = receive_afunc or rx_chan.receive self._recv = receive_afunc or rx_chan.receive
self._closed: bool = False self._closed: bool = False
self._raise_on_lag = raise_on_lag
async def receive(self) -> ReceiveType: def receive_nowait(
self,
_key: int | None = None,
_state: BroadcastState | None = None,
key = self.key ) -> Any:
state = self._state '''
Sync version of `.receive()` which does all the low level work
of receiving from the underlying/wrapped receive channel.
'''
key = _key or self.key
state = _state or self._state
# TODO: ideally we can make some way to "lock out" the # TODO: ideally we can make some way to "lock out" the
# underlying receive channel in some way such that if some task # underlying receive channel in some way such that if some task
@ -177,32 +223,47 @@ class BroadcastReceiver(ReceiveChannel):
# return this value." # return this value."
# https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging # https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging
mxln = state.maxlen
lost = seq - mxln
# decrement to the last value and expect # decrement to the last value and expect
# consumer to either handle the ``Lagged`` and come back # consumer to either handle the ``Lagged`` and come back
# or bail out on its own (thus un-subscribing) # or bail out on its own (thus un-subscribing)
state.subs[key] = state.maxlen - 1 state.subs[key] = mxln - 1
# this task was overrun by the producer side # this task was overrun by the producer side
task: Task = current_task() task: Task = current_task()
raise Lagged(f'Task {task.name} was overrun') msg = f'Task `{task.name}` overrun and dropped `{lost}` values'
if self._raise_on_lag:
raise Lagged(msg)
else:
log.warning(msg)
return self.receive_nowait(_key, _state)
state.subs[key] -= 1 state.subs[key] -= 1
return value return value
# current task already has the latest value **and** is the raise trio.WouldBlock
# first task to begin waiting for a new one
if state.recv_ready is None: async def _receive_from_underlying(
self,
key: int,
state: BroadcastState,
) -> ReceiveType:
if self._closed: if self._closed:
raise trio.ClosedResourceError raise trio.ClosedResourceError
event = trio.Event() event = trio.Event()
assert state.recv_ready is None
state.recv_ready = key, event state.recv_ready = key, event
try:
# if we're cancelled here it should be # if we're cancelled here it should be
# fine to bail without affecting any other consumers # fine to bail without affecting any other consumers
# right? # right?
try:
value = await self._recv() value = await self._recv()
# items with lower indices are "newer" # items with lower indices are "newer"
@ -220,7 +281,6 @@ class BroadcastReceiver(ReceiveChannel):
# already retreived the last value # already retreived the last value
# XXX: which of these impls is fastest? # XXX: which of these impls is fastest?
# subs = state.subs.copy() # subs = state.subs.copy()
# subs.pop(key) # subs.pop(key)
@ -251,54 +311,85 @@ class BroadcastReceiver(ReceiveChannel):
# consumers will be awoken with a sequence of -1 # consumers will be awoken with a sequence of -1
# and will potentially try to rewait the underlying # and will potentially try to rewait the underlying
# receiver instead of just cancelling immediately. # receiver instead of just cancelling immediately.
self._state.cancelled = True self._state.cancelled[key] = current_task()
if event.statistics().tasks_waiting: if event.statistics().tasks_waiting:
event.set() event.set()
raise raise
finally: finally:
# Reset receiver waiter task event for next blocking condition. # Reset receiver waiter task event for next blocking condition.
# this MUST be reset even if the above ``.recv()`` call # this MUST be reset even if the above ``.recv()`` call
# was cancelled to avoid the next consumer from blocking on # was cancelled to avoid the next consumer from blocking on
# an event that won't be set! # an event that won't be set!
state.recv_ready = None state.recv_ready = None
async def receive(self) -> ReceiveType:
key = self.key
state = self._state
try:
return self.receive_nowait(
_key=key,
_state=state,
)
except trio.WouldBlock:
pass
# current task already has the latest value **and** is the
# first task to begin waiting for a new one so we begin blocking
# until rescheduled with the a new value from the underlying.
if state.recv_ready is None:
return await self._receive_from_underlying(key, state)
# This task is all caught up and ready to receive the latest # This task is all caught up and ready to receive the latest
# value, so queue sched it on the internal event. # value, so queue/schedule it to be woken on the next internal
# event.
else: else:
seq = state.subs[key] while state.recv_ready is not None:
assert seq == -1 # sanity # seq = state.subs[key]
# assert seq == -1 # sanity
_, ev = state.recv_ready _, ev = state.recv_ready
await ev.wait() await ev.wait()
try:
return self.receive_nowait(
_key=key,
_state=state,
)
except trio.WouldBlock:
if self._closed:
raise trio.ClosedResourceError
# NOTE: if we ever would like the behaviour where if the subs = state.subs
# first task to recv on the underlying is cancelled but it if (
# still DOES trigger the ``.recv_ready``, event we'll likely need len(subs) == 1
# this logic: and key in subs
# or cancelled
):
# XXX: we are the last and only user of this BR so
# likely it makes sense to unwind back to the
# underlying?
# import tractor
# await tractor.breakpoint()
log.warning(
f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?'
)
if seq > -1:
# stuff from above..
seq = state.subs[key]
value = state.queue[seq]
state.subs[key] -= 1
return value
elif seq == -1:
# XXX: In the case where the first task to allocate the # XXX: In the case where the first task to allocate the
# ``.recv_ready`` event is cancelled we will be woken with # ``.recv_ready`` event is cancelled we will be woken
# a non-incremented sequence number and thus will read the # with a non-incremented sequence number (the ``-1``
# oldest value if we use that. Instead we need to detect if # sentinel) and thus will read the oldest value if we
# we have not been incremented and then receive again. # use that. Instead we need to detect if we have not
return await self.receive() # been incremented and then receive again.
# return await self.receive()
else: return await self._receive_from_underlying(key, state)
raise ValueError(f'Invalid sequence {seq}!?')
@asynccontextmanager @asynccontextmanager
async def subscribe( async def subscribe(
self, self,
raise_on_lag: bool = True,
) -> AsyncIterator[BroadcastReceiver]: ) -> AsyncIterator[BroadcastReceiver]:
''' '''
Subscribe for values from this broadcast receiver. Subscribe for values from this broadcast receiver.
@ -316,6 +407,7 @@ class BroadcastReceiver(ReceiveChannel):
rx_chan=self._rx, rx_chan=self._rx,
state=state, state=state,
receive_afunc=self._recv, receive_afunc=self._recv,
raise_on_lag=raise_on_lag,
) )
# assert clone in state.subs # assert clone in state.subs
assert br.key in state.subs assert br.key in state.subs
@ -352,7 +444,8 @@ def broadcast_receiver(
recv_chan: AsyncReceiver, recv_chan: AsyncReceiver,
max_buffer_size: int, max_buffer_size: int,
**kwargs, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = True,
) -> BroadcastReceiver: ) -> BroadcastReceiver:
@ -363,5 +456,6 @@ def broadcast_receiver(
maxlen=max_buffer_size, maxlen=max_buffer_size,
subs={}, subs={},
), ),
**kwargs, receive_afunc=receive_afunc,
raise_on_lag=raise_on_lag,
) )