Compare commits

..

37 Commits

Author SHA1 Message Date
Tyler Goodlet 34bb787d36 pdbp: adding typing to config settings vars 2023-05-08 12:02:42 -04:00
Tyler Goodlet 53f6fa4867 pdbp: flip dep name 2023-05-08 12:01:56 -04:00
Tyler Goodlet 0fe4ce5432 TOSQUASH 4759e30: turn it ON i guess? XD 2023-04-20 19:14:14 -04:00
Tyler Goodlet f65ed12c21 Oof, fix remaining `Actor.cancel()` in `Actor._from_parent()` 2023-04-20 19:13:35 -04:00
Tyler Goodlet 4759e301cb `pdbp`: turn off line truncating by default, fixes terminal resizing stuff 2023-04-19 15:31:02 -04:00
Tyler Goodlet ed5eabd054 Yeahh.. maybe sticky off by default is a little better for us XD 2023-04-18 09:43:14 -04:00
Tyler Goodlet 0f24d4b83c Add a debug-mode-breakpoint-causes-hang case!
Only found this by luck more or less (while working on something in
a client project) and it turns out we can actually get to (yet another)
hang state where SIGINT will be ignored by the root actor on teardown..

I've added all the necessary logic flags to reproduce. We obviously need
a follow up bug issue and a test suite to replicate!

It appears as though the following are required based on very light
tinkering:
- infected asyncio mode active
- debug mode active
- the `trio` context must breakpoint *before* `.started()`-ing
- the `asyncio` must **not** error
2023-04-17 13:43:16 -04:00
Tyler Goodlet 19b7f9c71a Some more 3.10+ optional type sigs 2023-04-17 13:43:16 -04:00
Tyler Goodlet b7341bb81e Add (first-draft) infected-`asyncio` actor task uses debugger example 2023-04-17 13:43:16 -04:00
Tyler Goodlet 4d3c109277 Restore `breakpoint()` hook after runtime exits
Previously we were leaking our (pdb++) override into the Python runtime
which would always result in a runtime error whenever `breakpoint()` is
called outside our runtime; after exit of the root actor . This
explicitly restores any previous hook override (detected during startup)
or deletes the hook and restores the environment if none existed prior.

Also adds a new WIP debugging example script to ensure breakpointing
works as normal after runtime close; this will be added to the test
suite.
2023-04-17 13:43:16 -04:00
Tyler Goodlet d0a65e8922 Hide actor nursery exit frame 2023-04-17 13:43:16 -04:00
Tyler Goodlet 61b3a72b7c First try: switch debug machinery over to `pdbp` B) 2023-04-17 13:43:16 -04:00
Tyler Goodlet f4ed2d29f8 Use multiline import for debug mod 2023-04-17 13:43:16 -04:00
Tyler Goodlet 205d38abfa Change over debugger tests to use `PROMPT` var.. 2023-04-17 13:43:16 -04:00
Tyler Goodlet ae1fa4c75e Tweak doc string 2023-04-14 18:08:08 -04:00
Tyler Goodlet c7b56ae639 Move move context code into new `._context` mod 2023-04-14 16:35:25 -04:00
Tyler Goodlet 97446b84c0 Drop caller cancels overrun test; covered in new tests 2023-04-14 16:35:25 -04:00
Tyler Goodlet b5a27e7864 Ignore drainer-task nursery RTE during context exit 2023-04-14 16:35:25 -04:00
Tyler Goodlet a7faa26686 Set `Context._scope_nursery` on callee side too
Because obviously we probably want to support `allow_overruns` on the
remote callee side as well XD

Only found the bugs fixed in this patch this thanks to writing a much
more exhaustive test set for overrun cases B)
2023-04-14 16:35:25 -04:00
Tyler Goodlet 1183276653 Seriously cover all overrun cases
This actually caught further runtime bugs so it's gud i tried..
Add overrun-ignore enabled / disabled cases and error catching for all
of them. More or less this should cover every possible outcome when
it comes to setting `allow_overruns: bool` i hope XD
2023-04-14 16:35:25 -04:00
Tyler Goodlet 8bd4db150b Adjust aio test for silent cancellation by parent 2023-04-14 16:35:25 -04:00
Tyler Goodlet 6aa780d0cd Fix cluster test to use `allow_overruns` 2023-04-14 16:35:25 -04:00
Tyler Goodlet 45f601a035 Flip allocate log msgs to debug 2023-04-14 16:35:25 -04:00
Tyler Goodlet ec4b72259d Drop brackpressure usage from fan out tests 2023-04-14 16:35:25 -04:00
Tyler Goodlet e97ed377b0 Remote `Context` cancellation semantics rework B)
This adds remote cancellation semantics to our `tractor.Context`
machinery to more closely match that of `trio.CancelScope` but
with operational differences to handle the nature of parallel tasks interoperating
across multiple memory boundaries:

- if an actor task cancels some context it has opened via
  `Context.cancel()`, the remote (scope linked) task will be cancelled
  using the normal `CancelScope` semantics of `trio` meaning the remote
  cancel scope surrounding the far side task is cancelled and
  `trio.Cancelled`s are expected to be raised in that scope as per
  normal `trio` operation, and in the case where no error is raised
  in that remote scope, a `ContextCancelled` error is raised inside the
  runtime machinery and relayed back to the opener/caller side of the
  context.
- if any actor task cancels a full remote actor runtime using
  `Portal.cancel_actor()` the same semantics as above apply except every
  other remote actor task which also has an open context with the actor
  which was cancelled will also be sent a `ContextCancelled` **but**
  with the `.canceller` field set to the uid of the original cancel
  requesting actor.

This changeset also includes a more "proper" solution to the issue of
"allowing overruns" during streaming without attempting to implement any
form of IPC streaming backpressure. Implementing task-granularity
backpressure cross-process turns out to be more or less impossible
without augmenting out streaming protocol (likely at the cost of
performance). Further allowing overruns requires special care since
any blocking of the runtime RPC msg loop task effectively can block
control msgs such as cancels and stream terminations.

The implementation details per abstraction layer are as follows.

._streaming.Context:
- add a new contructor factor func `mk_context()` which provides
  a strictly private init-er whilst allowing us to not have to define
  an `.__init__()` on the type def.
- add public `.cancel_called` and `.cancel_called_remote` properties.
- general rename of what was the internal `._backpressure` var to
  `._allow_overruns: bool`.
- move the old contents of `Actor._push_result()` into a new
  `._deliver_msg()` allowing for better encapsulation of per-ctx
  msg handling.
 - always check for received 'error' msgs and process them with the new
   `_maybe_cancel_and_set_remote_error()` **before** any msg delivery to
   the local task, thus guaranteeing error and cancellation handling
   despite any overflow handling.
- add a new `._drain_overflows()` task-method for use with new
  `._allow_overruns: bool = True` mode.
 - add back a `._scope_nursery: trio.Nursery` (allocated in
   `Portal.open_context()`) who's sole purpose is to spawn a single task
   which runs the above method; anything else is an error.
 - augment `._deliver_msg()` to start a task and run the above method
   when operating in no overrun mode; the task queues overflow msgs and
   attempts to send them to the underlying mem chan using a blocking
   `.send()` call.
 - on context exit, any existing "drainer task" will be cancelled and
   remaining overflow queued msgs are discarded with a warning.
- rename `._error` -> `_remote_error` and set it in a new method
  `_maybe_cancel_and_set_remote_error()` which is called before
  processing
- adjust `.result()` to always call `._maybe_raise_remote_err()` at its
  start such that whenever a `ContextCancelled` arrives we do logic for
  whether or not to immediately raise that error or ignore it due to the
  current actor being the one who requested the cancel, by checking the
  error's `.canceller` field.
 - set the default value of `._result` to be `id(Context()` thus avoiding
   conflict with any `.result()` actually being `False`..

._runtime.Actor:
- augment `.cancel()` and `._cancel_task()` and `.cancel_rpc_tasks()` to
  take a `requesting_uid: tuple` indicating the source actor of every
  cancellation request.
- pass through the new `Context._allow_overruns` through `.get_context()`
- call the new `Context._deliver_msg()` from `._push_result()` (since
  the factoring out that method's contents).

._runtime._invoke:
- `TastStatus.started()` back a `Context` (unless an error is raised)
  instead of the cancel scope to make it easy to set/get state on that
  context for the purposes of cancellation and remote error relay.
- always raise any remote error via `Context._maybe_raise_remote_err()`
  before doing any `ContextCancelled` logic.
- assign any `Context._cancel_called_remote` set by the `requesting_uid`
  cancel methods (mentioned above) to the `ContextCancelled.canceller`.

._runtime.process_messages:
- always pass a `requesting_uid: tuple` to `Actor.cancel()` and
  `._cancel_task` to that any corresponding `ContextCancelled.canceller`
  can be set inside `._invoke()`.
2023-04-14 16:35:25 -04:00
Tyler Goodlet 1ec30577de Only tuplize `.canceller` if non-`None` 2023-04-14 16:35:25 -04:00
Tyler Goodlet 0e81350a42 Move `NoRuntime` import inside `current_actor()` to avoid cycle 2023-04-14 16:35:25 -04:00
Tyler Goodlet ac51cf07b9 Augment test cases for callee-returns-result early
Turns out stuff was totally broken in these cases because we're either
closing the underlying mem chan too early or not handling the
"allow_overruns" mode's cancellation correctly..
2023-04-14 16:35:25 -04:00
Tyler Goodlet e16e7ca82a Add new remote error introspection attrs
To handle both remote cancellation this adds `ContextCanceled.canceller:
tuple` the uid of the cancel requesting actor and is expected to be set
by the runtime when servicing any remote cancel request. This makes it
possible for `ContextCancelled` receivers to know whether "their actor
runtime" is the source of the cancellation.

Also add an explicit `RemoteActor.src_actor_uid` which better formalizes
the notion of "which remote actor" the error originated from.

Both of these new attrs are expected to be packed in the `.msgdata` when
the errors are loaded locally.
2023-04-14 16:35:25 -04:00
Tyler Goodlet 7dd5d8d1f8 Add new set of context cancellation tests
These will verify new changes to the runtime/messaging core which allows
us to adopt an "ignore cancel if requested by us" style handling of
`ContextCancelled` more like how `trio` does with
`trio.Nursery.cancel_scope.cancel()`. We now expect
a `ContextCancelled.canceller: tuple` which is set to the actor uid of
the actor which requested the cancellation which eventually resulted in
the remote error-msg.

Also adds some experimental tweaks to the "backpressure" test which it
turns out is very problematic in coordination with context cancellation
since blocking on the feed mem chan to some task will block the ipc msg
loop and thus handling of cancellation.. More to come to both the test
and core to address this hopefully since right now this test is failing.
2023-04-14 16:35:25 -04:00
Tyler Goodlet 8913829511 Log waiter task cancelling msg as cancel-level 2023-04-14 16:35:25 -04:00
Tyler Goodlet 60bff71cd3 Assign `RemoteActorError` boxed error type for context cancelleds 2023-04-14 16:35:25 -04:00
Tyler Goodlet 29a1171142 Change a bunch of log levels to cancel, including any `ContextCancelled` handling 2023-04-14 16:35:25 -04:00
Tyler Goodlet b6c7f423f0 Add some log-level method doc-strings 2023-04-14 16:35:25 -04:00
Tyler Goodlet 4db87d3c43 Tweak context doc str 2023-04-14 16:35:25 -04:00
Tyler Goodlet 5f1e83e741 More single doc-strs in discovery mod 2023-04-14 16:35:25 -04:00
Tyler Goodlet ea2cc9ec75 Enable `Context` backpressure by default; avoid startup race-crashes? 2023-04-14 16:35:25 -04:00
26 changed files with 1676 additions and 794 deletions

View File

@ -6,14 +6,8 @@
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
Fundamentally, ``tractor`` gives you parallelism via
``trio``-"*actors*": independent Python processes (aka
non-shared-memory threads) which maintain structured
concurrency (SC) *end-to-end* inside a *supervision tree*.
Cross-process (and thus cross-host) SC is accomplished through the
combined use of our "actor nurseries_" and an "SC-transitive IPC
protocol" constructed on top of multiple Pythons each running a ``trio``
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
our nurseries_ let you spawn new Python processes which each run a ``trio``
scheduled runtime - a call to ``trio.run()``.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
@ -29,8 +23,7 @@ Features
- **It's just** a ``trio`` API
- *Infinitely nesteable* process trees
- Builtin IPC streaming APIs with task fan-out broadcasting
- A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of
`pdb++`_ thanks to @mdmintz!)
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
- Support for a swappable, OS specific, process spawning layer
- A modular transport stack, allowing for custom serialization (eg. with
`msgspec`_), communications protocols, and environment specific IPC
@ -125,7 +118,7 @@ Zombie safe: self-destruct a process tree
f"running in pid {os.getpid()}"
)
await trio.sleep_forever()
await trio.sleep_forever()
async def main():
@ -156,7 +149,7 @@ it **is a bug**.
"Native" multi-process debugging
--------------------------------
Using the magic of `pdbp`_ and our internal IPC, we've
Using the magic of `pdb++`_ and our internal IPC, we've
been able to create a native feeling debugging experience for
any (sub-)process in your ``tractor`` tree.
@ -604,7 +597,6 @@ channel`_!
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
.. _pdbp: https://github.com/mdmintz/pdbp
.. _pdb++: https://github.com/pdbpp/pdbpp
.. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
.. _messages: https://en.wikipedia.org/wiki/Message_passing

View File

@ -0,0 +1,117 @@
import asyncio
import trio
import tractor
from tractor import to_asyncio
async def aio_sleep_forever():
await asyncio.sleep(float('inf'))
async def bp_then_error(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
raise_after_bp: bool = True,
) -> None:
# sync with ``trio``-side (caller) task
to_trio.send_nowait('start')
# NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `._debug._set_trace()` but
# we set `Lock.local_task_in_debug = 'sync'`, we probably want
# some further, at least, meta-data about the task/actoq in debug
# in terms of making it clear it's asyncio mucking about.
breakpoint()
# short checkpoint / delay
await asyncio.sleep(0.5)
if raise_after_bp:
raise ValueError('blah')
# TODO: test case with this so that it gets cancelled?
else:
# XXX NOTE: this is required in order to get the SIGINT-ignored
# hang case documented in the module script section!
await aio_sleep_forever()
@tractor.context
async def trio_ctx(
ctx: tractor.Context,
bp_before_started: bool = False,
):
# this will block until the ``asyncio`` task sends a "first"
# message, see first line in above func.
async with (
to_asyncio.open_channel_from(
bp_then_error,
raise_after_bp=not bp_before_started,
) as (first, chan),
trio.open_nursery() as n,
):
assert first == 'start'
if bp_before_started:
await tractor.breakpoint()
await ctx.started(first)
n.start_soon(
to_asyncio.run_task,
aio_sleep_forever,
)
await trio.sleep_forever()
async def main(
bps_all_over: bool = False,
) -> None:
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
debug_mode=True,
loglevel='cancel',
)
async with p.open_context(
trio_ctx,
bp_before_started=bps_all_over,
) as (ctx, first):
assert first == 'start'
if bps_all_over:
await tractor.breakpoint()
# await trio.sleep_forever()
await ctx.cancel()
assert 0
# TODO: case where we cancel from trio-side while asyncio task
# has debugger lock?
# await p.cancel_actor()
if __name__ == '__main__':
# works fine B)
trio.run(main)
# will hang and ignores SIGINT !!
# NOTE: you'll need to send a SIGQUIT (via ctl-\) to kill it
# manually..
# trio.run(main, True)

View File

@ -1,7 +0,0 @@
Drop `trio.Process.aclose()` usage, copy into our spawning code.
The details are laid out in https://github.com/goodboy/tractor/issues/330.
`trio` changed is process running quite some time ago, this just copies
out the small bit we needed (from the old `.aclose()`) for hard kills
where a soft runtime cancel request fails and our "zombie killer"
implementation kicks in.

View File

@ -1,15 +0,0 @@
Switch to using the fork & fix of `pdb++`, `pdbp`:
https://github.com/mdmintz/pdbp
Allows us to sidestep a variety of issues that aren't being maintained
in the upstream project thanks to the hard work of @mdmintz!
We also include some default settings adjustments as per recent
development on the fork:
- sticky mode is still turned on by default but now activates when
a using the `ll` repl command.
- turn off line truncation by default to avoid inter-line gaps when
resizing the terimnal during use.
- when using the backtrace cmd either by `w` or `bt`, the config
automatically switches to non-sticky mode.

View File

@ -1,7 +1,7 @@
pytest
pytest-trio
pytest-timeout
pdbp
pdbpp
mypy
trio_typing
pexpect

View File

@ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup(
name="tractor",
version='0.1.0a6dev0', # alpha zone
description='structured concurrrent `trio`-"actors"',
description='structured concurrrent "actors"',
long_description=readme,
license='AGPLv3',
author='Tyler Goodlet',
maintainer='Tyler Goodlet',
maintainer_email='goodboy_foss@protonmail.com',
maintainer_email='jgbt@protonmail.com',
url='https://github.com/goodboy/tractor',
platforms=['linux', 'windows'],
packages=[
@ -52,10 +52,12 @@ setup(
# tooling
'tricycle',
'trio_typing',
# tooling
'colorlog',
'wrapt',
# IPC serialization
# serialization
'msgspec',
# debug mode REPL
@ -71,9 +73,10 @@ setup(
# https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"',
],
tests_require=['pytest'],
python_requires=">=3.10",
python_requires=">=3.9",
keywords=[
'trio',
'async',

View File

@ -49,7 +49,7 @@ async def worker(
await ctx.started()
async with ctx.open_stream(
backpressure=True,
allow_overruns=True,
) as stream:
# TODO: this with the below assert causes a hang bug?

View File

@ -13,7 +13,10 @@ from typing import Optional
import pytest
import trio
import tractor
from tractor._exceptions import StreamOverrun
from tractor._exceptions import (
StreamOverrun,
ContextCancelled,
)
from conftest import tractor_test
@ -91,7 +94,10 @@ async def not_started_but_stream_opened(
@pytest.mark.parametrize(
'target',
[too_many_starteds, not_started_but_stream_opened],
[
too_many_starteds,
not_started_but_stream_opened,
],
ids='misuse_type={}'.format,
)
def test_started_misuse(target):
@ -228,6 +234,88 @@ def test_simple_context(
trio.run(main)
@pytest.mark.parametrize(
'callee_returns_early',
[True, False],
ids=lambda item: f'callee_returns_early={item}'
)
@pytest.mark.parametrize(
'cancel_method',
['ctx', 'portal'],
ids=lambda item: f'cancel_method={item}'
)
@pytest.mark.parametrize(
'chk_ctx_result_before_exit',
[True, False],
ids=lambda item: f'chk_ctx_result_before_exit={item}'
)
def test_caller_cancels(
cancel_method: str,
chk_ctx_result_before_exit: bool,
callee_returns_early: bool,
):
'''
Verify that when the opening side of a context (aka the caller)
cancels that context, the ctx does not raise a cancelled when
either calling `.result()` or on context exit.
'''
async def check_canceller(
ctx: tractor.Context,
) -> None:
# should not raise yet return the remote
# context cancelled error.
res = await ctx.result()
if callee_returns_early:
assert res == 'yo'
else:
err = res
assert isinstance(err, ContextCancelled)
assert (
tuple(err.canceller)
==
tractor.current_actor().uid
)
async def main():
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
'simple_context',
enable_modules=[__name__],
)
timeout = 0.5 if not callee_returns_early else 2
with trio.fail_after(timeout):
async with portal.open_context(
simple_setup_teardown,
data=10,
block_forever=not callee_returns_early,
) as (ctx, sent):
if callee_returns_early:
# ensure we block long enough before sending
# a cancel such that the callee has already
# returned it's result.
await trio.sleep(0.5)
if cancel_method == 'ctx':
await ctx.cancel()
else:
await portal.cancel_actor()
if chk_ctx_result_before_exit:
await check_canceller(ctx)
if not chk_ctx_result_before_exit:
await check_canceller(ctx)
if cancel_method != 'portal':
await portal.cancel_actor()
trio.run(main)
# basic stream terminations:
# - callee context closes without using stream
# - caller context closes without using stream
@ -506,7 +594,6 @@ async def test_callee_cancels_before_started():
cancel_self,
) as (ctx, sent):
async with ctx.open_stream():
await trio.sleep_forever()
# raises a special cancel signal
@ -559,7 +646,6 @@ async def keep_sending_from_callee(
'overrun_by',
[
('caller', 1, never_open_stream),
('cancel_caller_during_overrun', 1, never_open_stream),
('callee', 0, keep_sending_from_callee),
],
ids='overrun_condition={}'.format,
@ -589,14 +675,13 @@ def test_one_end_stream_not_opened(overrun_by):
if 'caller' in overrunner:
async with ctx.open_stream() as stream:
# itersend +1 msg more then the buffer size
# to cause the most basic overrun.
for i in range(buf_size):
print(f'sending {i}')
await stream.send(i)
if 'cancel' in overrunner:
# without this we block waiting on the child side
await ctx.cancel()
else:
# expect overrun error to be relayed back
# and this sleep interrupted
@ -610,7 +695,9 @@ def test_one_end_stream_not_opened(overrun_by):
# 2 overrun cases and the no overrun case (which pushes right up to
# the msg limit)
if overrunner == 'caller' or 'cance' in overrunner:
if (
overrunner == 'caller'
):
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
@ -634,40 +721,102 @@ async def echo_back_sequence(
ctx: tractor.Context,
seq: list[int],
msg_buffer_size: Optional[int] = None,
wait_for_cancel: bool,
allow_overruns_side: str,
be_slow: bool = False,
msg_buffer_size: int = 1,
) -> None:
'''
Send endlessly on the calleee stream.
Send endlessly on the calleee stream using a small buffer size
setting on the contex to simulate backlogging that would normally
cause overruns.
'''
# NOTE: ensure that if the caller is expecting to cancel this task
# that we stay echoing much longer then they are so we don't
# return early instead of receive the cancel msg.
total_batches: int = 1000 if wait_for_cancel else 6
await ctx.started()
# await tractor.breakpoint()
async with ctx.open_stream(
msg_buffer_size=msg_buffer_size,
# literally the point of this test XD
allow_overruns=(allow_overruns_side in {'child', 'both'}),
) as stream:
seq = list(seq) # bleh, `msgpack`...
count = 0
while count < 3:
# ensure mem chan settings are correct
assert (
ctx._send_chan._state.max_buffer_size
==
msg_buffer_size
)
seq = list(seq) # bleh, msgpack sometimes ain't decoded right
for _ in range(total_batches):
batch = []
async for msg in stream:
batch.append(msg)
if batch == seq:
break
if be_slow:
await trio.sleep(0.05)
print('callee waiting on next')
for msg in batch:
print(f'callee sending {msg}')
await stream.send(msg)
count += 1
return 'yo'
print(
'EXITING CALLEEE:\n'
f'{ctx.cancel_called_remote}'
)
return 'yo'
def test_stream_backpressure():
@pytest.mark.parametrize(
# aka the side that will / should raise
# and overrun under normal conditions.
'allow_overruns_side',
['parent', 'child', 'none', 'both'],
ids=lambda item: f'allow_overruns_side={item}'
)
@pytest.mark.parametrize(
# aka the side that will / should raise
# and overrun under normal conditions.
'slow_side',
['parent', 'child'],
ids=lambda item: f'slow_side={item}'
)
@pytest.mark.parametrize(
'cancel_ctx',
[True, False],
ids=lambda item: f'cancel_ctx={item}'
)
def test_maybe_allow_overruns_stream(
cancel_ctx: bool,
slow_side: str,
allow_overruns_side: str,
loglevel: str,
):
'''
Demonstrate small overruns of each task back and forth
on a stream not raising any errors by default.
on a stream not raising any errors by default by setting
the ``allow_overruns=True``.
The original idea here was to show that if you set the feeder mem
chan to a size smaller then the # of msgs sent you could could not
get a `StreamOverrun` crash plus maybe get all the msgs that were
sent. The problem with the "real backpressure" case is that due to
the current arch it can result in the msg loop being blocked and thus
blocking cancellation - which is like super bad. So instead this test
had to be adjusted to more or less just "not send overrun errors" so
as to handle the case where the sender just moreso cares about not getting
errored out when it send to fast..
'''
async def main():
@ -675,38 +824,104 @@ def test_stream_backpressure():
portal = await n.start_actor(
'callee_sends_forever',
enable_modules=[__name__],
loglevel=loglevel,
# debug_mode=True,
)
seq = list(range(3))
seq = list(range(10))
async with portal.open_context(
echo_back_sequence,
seq=seq,
msg_buffer_size=1,
wait_for_cancel=cancel_ctx,
be_slow=(slow_side == 'child'),
allow_overruns_side=allow_overruns_side,
) as (ctx, sent):
assert sent is None
async with ctx.open_stream(msg_buffer_size=1) as stream:
count = 0
while count < 3:
async with ctx.open_stream(
msg_buffer_size=1 if slow_side == 'parent' else None,
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
) as stream:
total_batches: int = 2
for _ in range(total_batches):
for msg in seq:
print(f'caller sending {msg}')
# print(f'root tx {msg}')
await stream.send(msg)
await trio.sleep(0.1)
if slow_side == 'parent':
# NOTE: we make the parent slightly
# slower, when it is slow, to make sure
# that in the overruns everywhere case
await trio.sleep(0.16)
batch = []
async for msg in stream:
print(f'root rx {msg}')
batch.append(msg)
if batch == seq:
break
count += 1
if cancel_ctx:
# cancel the remote task
print('sending root side cancel')
await ctx.cancel()
# here the context should return
assert await ctx.result() == 'yo'
res = await ctx.result()
if cancel_ctx:
assert isinstance(res, ContextCancelled)
assert tuple(res.canceller) == tractor.current_actor().uid
else:
print(f'RX ROOT SIDE RESULT {res}')
assert res == 'yo'
# cancel the daemon
await portal.cancel_actor()
trio.run(main)
if (
allow_overruns_side == 'both'
or slow_side == allow_overruns_side
):
trio.run(main)
elif (
slow_side != allow_overruns_side
):
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
err = excinfo.value
if (
allow_overruns_side == 'none'
):
# depends on timing is is racy which side will
# overrun first :sadkitty:
# NOTE: i tried to isolate to a deterministic case here
# based on timeing, but i was kinda wasted, and i don't
# think it's sane to catch them..
assert err.type in (
tractor.RemoteActorError,
StreamOverrun,
)
elif (
slow_side == 'child'
):
assert err.type == StreamOverrun
elif slow_side == 'parent':
assert err.type == tractor.RemoteActorError
assert 'StreamOverrun' in err.msgdata['tb_str']
else:
# if this hits the logic blocks from above are not
# exhaustive..
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
@tractor.context
@ -737,18 +952,18 @@ async def attach_to_sleep_forever():
finally:
# XXX: previously this would trigger local
# ``ContextCancelled`` to be received and raised in the
# local context overriding any local error due to
# logic inside ``_invoke()`` which checked for
# an error set on ``Context._error`` and raised it in
# under a cancellation scenario.
# The problem is you can have a remote cancellation
# that is part of a local error and we shouldn't raise
# ``ContextCancelled`` **iff** we weren't the side of
# the context to initiate it, i.e.
# local context overriding any local error due to logic
# inside ``_invoke()`` which checked for an error set on
# ``Context._error`` and raised it in a cancellation
# scenario.
# ------
# The problem is you can have a remote cancellation that
# is part of a local error and we shouldn't raise
# ``ContextCancelled`` **iff** we **were not** the side
# of the context to initiate it, i.e.
# ``Context._cancel_called`` should **NOT** have been
# set. The special logic to handle this case is now
# inside ``Context._may_raise_from_remote_msg()`` XD
# inside ``Context._maybe_raise_from_remote_msg()`` XD
await peer_ctx.cancel()
@ -769,9 +984,10 @@ async def error_before_started(
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
'''
Verify that an error raised in a remote context which itself opens another
remote context, which it cancels, does not ovverride the original error that
caused the cancellation of the secondardy context.
Verify that an error raised in a remote context which itself opens
another remote context, which it cancels, does not ovverride the
original error that caused the cancellation of the secondardy
context.
'''
async def main():

View File

@ -151,6 +151,19 @@ def ctlc(
use_ctlc = request.param
# TODO: we can remove this bc pdbp right?
if (
sys.version_info <= (3, 10)
and use_ctlc
):
# on 3.9 it seems the REPL UX
# is highly unreliable and frankly annoying
# to test for. It does work from manual testing
# but i just don't think it's wroth it to try
# and get this working especially since we want to
# be 3.10+ mega-asap.
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
node = request.node
markers = node.own_markers
for mark in markers:
@ -181,15 +194,13 @@ def ctlc(
ids=lambda item: f'{item[0]} -> {item[1]}',
)
def test_root_actor_error(spawn, user_in_out):
'''
Demonstrate crash handler entering pdb from basic error in root actor.
'''
"""Demonstrate crash handler entering pdbpp from basic error in root actor.
"""
user_input, expect_err_str = user_in_out
child = spawn('root_actor_error')
# scan for the prompt
# scan for the pdbpp prompt
expect(child, PROMPT)
before = str(child.before.decode())
@ -220,7 +231,7 @@ def test_root_actor_bp(spawn, user_in_out):
user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint')
# scan for the prompt
# scan for the pdbpp prompt
child.expect(PROMPT)
assert 'Error' not in str(child.before)
@ -328,7 +339,7 @@ def test_subactor_error(
'''
child = spawn('subactor_error')
# scan for the prompt
# scan for the pdbpp prompt
child.expect(PROMPT)
before = str(child.before.decode())
@ -376,7 +387,7 @@ def test_subactor_breakpoint(
child = spawn('subactor_breakpoint')
# scan for the prompt
# scan for the pdbpp prompt
child.expect(PROMPT)
before = str(child.before.decode())
@ -437,7 +448,7 @@ def test_multi_subactors(
'''
child = spawn(r'multi_subactors')
# scan for the prompt
# scan for the pdbpp prompt
child.expect(PROMPT)
before = str(child.before.decode())
@ -677,7 +688,7 @@ def test_multi_subactors_root_errors(
'''
child = spawn('multi_subactor_root_errors')
# scan for the prompt
# scan for the pdbpp prompt
child.expect(PROMPT)
# at most one subactor should attach before the root is cancelled

View File

@ -15,6 +15,7 @@ import tractor
from tractor import (
to_asyncio,
RemoteActorError,
ContextCancelled,
)
from tractor.trionics import BroadcastReceiver
@ -224,14 +225,23 @@ def test_context_spawns_aio_task_that_errors(
await trio.sleep_forever()
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
return await ctx.result()
err = excinfo.value
assert isinstance(err, RemoteActorError)
if parent_cancels:
assert err.type == trio.Cancelled
# bc the parent made the cancel request,
# the error is not raised locally but instead
# the context is exited silently
res = trio.run(main)
assert isinstance(res, ContextCancelled)
assert 'root' in res.canceller[0]
else:
expect = RemoteActorError
with pytest.raises(expect) as excinfo:
trio.run(main)
err = excinfo.value
assert isinstance(err, expect)
assert err.type == AssertionError

View File

@ -86,7 +86,7 @@ async def open_sequence_streamer(
) as (ctx, first):
assert first is None
async with ctx.open_stream(backpressure=True) as stream:
async with ctx.open_stream(allow_overruns=True) as stream:
yield stream
await portal.cancel_actor()
@ -413,8 +413,8 @@ def test_ensure_slow_consumers_lag_out(
seq = brx._state.subs[brx.key]
assert seq == len(brx._state.queue) - 1
# all backpressured entries in the underlying
# channel should have been copied into the caster
# all no_overruns entries in the underlying
# channel should have been copied into the bcaster
# queue trailing-window
async for i in rx:
print(f'bped: {i}')

View File

@ -15,18 +15,20 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
tractor: structured concurrent "actors".
tractor: structured concurrent ``trio``-"actors".
"""
from exceptiongroup import BaseExceptionGroup
from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
from ._context import (
Context,
context,
)
from ._streaming import (
MsgStream,
stream,
context,
)
from ._discovery import (
get_arbiter,

771
tractor/_context.py 100644
View File

@ -0,0 +1,771 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The fundamental cross process SC abstraction: an inter-actor,
cancel-scope linked task "context".
A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors.
'''
from __future__ import annotations
from collections import deque
from contextlib import asynccontextmanager as acm
from dataclasses import (
dataclass,
field,
)
from functools import partial
import inspect
from pprint import pformat
from typing import (
Any,
Callable,
AsyncGenerator,
TYPE_CHECKING,
)
import warnings
import trio
from ._exceptions import (
unpack_error,
pack_error,
ContextCancelled,
StreamOverrun,
)
from .log import get_logger
from ._ipc import Channel
from ._streaming import MsgStream
from ._state import current_actor
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__)
@dataclass
class Context:
'''
An inter-actor, ``trio``-task communication context.
NB: This class should never be instatiated directly, it is delivered
by either,
- runtime machinery to a remotely started task or,
- by entering ``Portal.open_context()``.
and is always constructed using ``mkt_context()``.
Allows maintaining task or protocol specific state between
2 communicating, parallel executing actor tasks. A unique context is
allocated on each side of any task RPC-linked msg dialog, for
every request to a remote actor from a portal. On the "callee"
side a context is always allocated inside ``._runtime._invoke()``.
A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel, it can also open task
oriented message streams, and acts more or less as an IPC aware
inter-actor-task ``trio.CancelScope``.
'''
chan: Channel
cid: str
# these are the "feeder" channels for delivering
# message values to the local task from the runtime
# msg processing loop.
_recv_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel
_remote_func_type: str | None = None
# only set on the caller side
_portal: Portal | None = None # type: ignore # noqa
_result: Any | int = None
_remote_error: BaseException | None = None
# cancellation state
_cancel_called: bool = False
_cancel_called_remote: tuple | None = None
_cancel_msg: str | None = None
_scope: trio.CancelScope | None = None
_enter_debugger_on_cancel: bool = True
@property
def cancel_called(self) -> bool:
'''
Records whether cancellation has been requested for this context
by either an explicit call to ``.cancel()`` or an implicit call
due to an error caught inside the ``Portal.open_context()``
block.
'''
return self._cancel_called
@property
def cancel_called_remote(self) -> tuple[str, str] | None:
'''
``Actor.uid`` of the remote actor who's task was cancelled
causing this side of the context to also be cancelled.
'''
remote_uid = self._cancel_called_remote
if remote_uid:
return tuple(remote_uid)
@property
def cancelled_caught(self) -> bool:
return self._scope.cancelled_caught
# init and streaming state
_started_called: bool = False
_started_received: bool = False
_stream_opened: bool = False
# overrun handling machinery
# NOTE: none of this provides "backpressure" to the remote
# task, only an ability to not lose messages when the local
# task is configured to NOT transmit ``StreamOverrun``s back
# to the other side.
_overflow_q: deque[dict] = field(
default_factory=partial(
deque,
maxlen=616,
)
)
_scope_nursery: trio.Nursery | None = None
_in_overrun: bool = False
_allow_overruns: bool = False
async def send_yield(
self,
data: Any,
) -> None:
warnings.warn(
"`Context.send_yield()` is now deprecated. "
"Use ``MessageStream.send()``. ",
DeprecationWarning,
stacklevel=2,
)
await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
async def _maybe_cancel_and_set_remote_error(
self,
error_msg: dict[str, Any],
) -> None:
'''
(Maybe) unpack and raise a msg error into the local scope
nursery for this context.
Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task.
'''
# If this is an error message from a context opened by
# ``Portal.open_context()`` we want to interrupt any ongoing
# (child) tasks within that context to be notified of the remote
# error relayed here.
#
# The reason we may want to raise the remote error immediately
# is that there is no guarantee the associated local task(s)
# will attempt to read from any locally opened stream any time
# soon.
#
# NOTE: this only applies when
# ``Portal.open_context()`` has been called since it is assumed
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
error = unpack_error(
error_msg,
self.chan,
)
# XXX: set the remote side's error so that after we cancel
# whatever task is the opener of this context it can raise
# that error as the reason.
self._remote_error = error
if (
isinstance(error, ContextCancelled)
):
log.cancel(
'Remote task-context sucessfully cancelled for '
f'{self.chan.uid}:{self.cid}'
)
if self._cancel_called:
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{error_msg["error"]["tb_str"]}'
)
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
# YES! this is way better and simpler!
if (
self._scope
):
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
self._cancel_called_remote = self.chan.uid
self._scope.cancel()
# NOTE: this usage actually works here B)
# from ._debug import breakpoint
# await breakpoint()
# XXX: this will break early callee results sending
# since when `.result()` is finally called, this
# chan will be closed..
# if self._recv_chan:
# await self._recv_chan.aclose()
async def cancel(
self,
msg: str | None = None,
timeout: float = 0.5,
# timeout: float = 1000,
) -> None:
'''
Cancel this inter-actor-task context.
Request that the far side cancel it's current linked context,
Timeout quickly in an attempt to sidestep 2-generals...
'''
side = 'caller' if self._portal else 'callee'
if msg:
assert side == 'callee', 'Only callee side can provide cancel msg'
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
# await _debug.breakpoint()
# breakpoint()
if side == 'caller':
if not self._portal:
raise RuntimeError(
"No portal found, this is likely a callee side context"
)
cid = self.cid
with trio.move_on_after(timeout) as cs:
# cs.shield = True
log.cancel(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
# print("EXITING CANCEL CALL")
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
# if not self._portal.channel.connected():
if not self.chan.connected():
log.cancel(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
else:
log.cancel(
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
# callee side remote task
else:
self._cancel_msg = msg
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36
assert self._scope
self._scope.cancel()
@acm
async def open_stream(
self,
allow_overruns: bool | None = False,
msg_buffer_size: int | None = None,
) -> AsyncGenerator[MsgStream, None]:
'''
Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and
callee for the stream to logically be considered "connected".
A ``MsgStream`` is currently "one-shot" use, meaning if you
close it you can not "re-open" it for streaming and instead you
must re-establish a new surrounding ``Context`` using
``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support
"re-opening":
- pausing a stream can be done with a message.
- task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of
``trio``'s cancellation system.
'''
actor = current_actor()
# here we create a mem chan that corresponds to the
# far end caller / callee.
# Likewise if the surrounding context has been cancelled we error here
# since it likely means the surrounding block was exited or
# killed
if self._cancel_called:
task = trio.lowlevel.current_task().name
raise ContextCancelled(
f'Context around {actor.uid[0]}:{task} was already cancelled!'
)
if not self._portal and not self._started_called:
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.start_remote_task()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx = actor.get_context(
self.chan,
self.cid,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._allow_overruns = allow_overruns
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if ctx._recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?')
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
if self._portal:
self._portal._streams.add(stream)
try:
self._stream_opened = True
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# NOTE: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await stream.aclose()
finally:
if self._portal:
try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
def _maybe_raise_remote_err(
self,
err: Exception,
) -> None:
# NOTE: whenever the context's "opener" side (task) **is**
# the side which requested the cancellation (likekly via
# ``Context.cancel()``), we don't want to re-raise that
# cancellation signal locally (would be akin to
# a ``trio.Nursery`` nursery raising ``trio.Cancelled``
# whenever ``CancelScope.cancel()`` was called) and instead
# silently reap the expected cancellation "error"-msg.
# if 'pikerd' in err.msgdata['tb_str']:
# # from . import _debug
# # await _debug.breakpoint()
# breakpoint()
if (
isinstance(err, ContextCancelled)
and (
self._cancel_called
or self.chan._cancel_called
or tuple(err.canceller) == current_actor().uid
)
):
return err
raise err # from None
async def result(self) -> Any | Exception:
'''
From some (caller) side task, wait for and return the final
result from the remote (callee) side's task.
This provides a mechanism for one task running in some actor to wait
on another task at the other side, in some other actor, to terminate.
If the remote task is still in a streaming state (it is delivering
values from inside a ``Context.open_stream():`` block, then those
msgs are drained but discarded since it is presumed this side of
the context has already finished with its own streaming logic.
If the remote context (or its containing actor runtime) was
canceled, either by a local task calling one of
``Context.cancel()`` or `Portal.cancel_actor()``, we ignore the
received ``ContextCancelled`` exception if the context or
underlying IPC channel is marked as having been "cancel called".
This is similar behavior to using ``trio.Nursery.cancel()``
wherein tasks which raise ``trio.Cancel`` are silently reaped;
the main different in this API is in the "cancel called" case,
instead of just not raising, we also return the exception *as
the result* since client code may be interested in the details
of the remote cancellation.
'''
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
# from . import _debug
# await _debug.breakpoint()
re = self._remote_error
if re:
self._maybe_raise_remote_err(re)
return re
if (
self._result == id(self)
and not self._remote_error
and not self._recv_chan._closed # type: ignore
):
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
msg = await self._recv_chan.receive()
try:
self._result = msg['return']
# NOTE: we don't need to do this right?
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if self._recv_chan:
# await self._recv_chan.aclose()
break
except KeyError: # as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
err = unpack_error(
msg,
self._portal.channel
) # from msgerr
err = self._maybe_raise_remote_err(err)
self._remote_err = err
return self._remote_error or self._result
async def started(
self,
value: Any | None = None
) -> None:
'''
Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side.
On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``.
'''
if self._portal:
raise RuntimeError(
f"Caller side context {self} can not call started!")
elif self._started_called:
raise RuntimeError(
f"called 'started' twice on context with {self.chan.uid}")
await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True
# TODO: do we need a restart api?
# async def restart(self) -> None:
# pass
async def _drain_overflows(
self,
) -> None:
'''
Private task spawned to push newly received msgs to the local
task which getting overrun by the remote side.
In order to not block the rpc msg loop, but also not discard
msgs received in this context, we need to async push msgs in
a new task which only runs for as long as the local task is in
an overrun state.
'''
self._in_overrun = True
try:
while self._overflow_q:
# NOTE: these msgs should never be errors since we always do
# the check prior to checking if we're in an overrun state
# inside ``.deliver_msg()``.
msg = self._overflow_q.popleft()
try:
await self._send_chan.send(msg)
except trio.BrokenResourceError:
log.warning(
f"{self._send_chan} consumer is already closed"
)
return
except trio.Cancelled:
# we are obviously still in overrun
# but the context is being closed anyway
# so we just warn that there are un received
# msgs still..
self._overflow_q.appendleft(msg)
fmt_msgs = ''
for msg in self._overflow_q:
fmt_msgs += f'{pformat(msg)}\n'
log.warning(
f'Context for {self.cid} is being closed while '
'in an overrun state!\n'
'Discarding the following msgs:\n'
f'{fmt_msgs}\n'
)
raise
finally:
# task is now finished with the backlog so mark us as
# no longer in backlog.
self._in_overrun = False
async def _deliver_msg(
self,
msg: dict,
draining: bool = False,
) -> bool:
cid = self.cid
chan = self.chan
uid = chan.uid
send_chan: trio.MemorySendChannel = self._send_chan
log.runtime(
f"Delivering {msg} from {uid} to caller {cid}"
)
error = msg.get('error')
if error:
await self._maybe_cancel_and_set_remote_error(msg)
if (
self._in_overrun
):
self._overflow_q.append(msg)
return False
try:
send_chan.send_nowait(msg)
return True
# if an error is deteced we should always
# expect it to be raised by any context (stream)
# consumer task
except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the
# local task has already sent a 'stop' / StopAsyncInteration
# to the other side but and possibly has closed the local
# feeder mem chan? Do we wait for some kind of ack or just
# let this fail silently and bubble up (currently)?
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed")
return False
# NOTE XXX: by default we do **not** maintain context-stream
# backpressure and instead opt to relay stream overrun errors to
# the sender; the main motivation is that using bp can block the
# msg handling loop which calls into this method!
except trio.WouldBlock:
# XXX: always push an error even if the local
# receiver is in overrun state.
# await self._maybe_cancel_and_set_remote_error(msg)
local_uid = current_actor().uid
lines = [
f'OVERRUN on actor-task context {cid}@{local_uid}!\n'
# TODO: put remote task name here if possible?
f'remote sender actor: {uid}',
# TODO: put task func name here and maybe an arrow
# from sender to overrunner?
# f'local task {self.func_name}'
]
if not self._stream_opened:
lines.insert(
1,
f'\n*** No stream open on `{local_uid[0]}` side! ***\n'
)
text = '\n'.join(lines)
# XXX: lul, this really can't be backpressure since any
# blocking here will block the entire msg loop rpc sched for
# a whole channel.. maybe we should rename it?
if self._allow_overruns:
text += f'\nStarting overflow queuing task on msg: {msg}'
log.warning(text)
if (
not self._in_overrun
):
self._overflow_q.append(msg)
n = self._scope_nursery
assert not n.child_tasks
try:
n.start_soon(
self._drain_overflows,
)
except RuntimeError:
# if the nursery is already cancelled due to
# this context exiting or in error, we ignore
# the nursery error since we never expected
# anything different.
return False
else:
try:
raise StreamOverrun(text)
except StreamOverrun as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
try:
await chan.send(err_msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
return False
def mk_context(
chan: Channel,
cid: str,
msg_buffer_size: int = 2**6,
**kwargs,
) -> Context:
'''
Internal factory to create an inter-actor task ``Context``.
This is called by internals and should generally never be called
by user code.
'''
send_chan: trio.MemorySendChannel
recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
ctx = Context(
chan,
cid,
_send_chan=send_chan,
_recv_chan=recv_chan,
**kwargs,
)
ctx._result: int | Any = id(ctx)
return ctx
def context(func: Callable) -> Callable:
'''
Mark an async function as a streaming routine with ``@context``.
'''
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore
sig = inspect.signature(func)
params = sig.parameters
if 'ctx' not in params:
raise TypeError(
"The first argument to the context function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
return func

View File

@ -54,6 +54,18 @@ from ._exceptions import (
)
from ._ipc import Channel
# TODO: we can drop this now yah?
# try:
# # wtf: only exported when installed in dev mode?
# import pdbp
# except ImportError:
# # pdbpp is installed in regular mode...it monkey patches stuff
# import pdb
# xpm = getattr(pdb, 'xpm', None)
# assert xpm, "pdbpp is not installed?" # type: ignore
# pdbpp = pdb
log = get_logger(__name__)

View File

@ -41,8 +41,10 @@ async def get_arbiter(
port: int,
) -> AsyncGenerator[Union[Portal, LocalPortal], None]:
'''Return a portal instance connected to a local or remote
'''
Return a portal instance connected to a local or remote
arbiter.
'''
actor = current_actor()
@ -134,12 +136,16 @@ async def find_actor(
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
arbiter_sockaddr: tuple[str, int] | None = None,
# registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.
'''
Wait on an actor to register with the arbiter.
A portal to the first registered actor is returned.
"""
'''
actor = current_actor()
async with get_arbiter(

View File

@ -132,7 +132,7 @@ def _trio_main(
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI")
log.cancel(f"Actor {actor.uid} received KBI")
finally:
log.info(f"Actor {actor.uid} terminated")

View File

@ -18,18 +18,18 @@
Our classy exception set.
"""
import builtins
import importlib
from typing import (
Any,
Optional,
Type,
)
import importlib
import builtins
import traceback
import exceptiongroup as eg
import trio
from ._state import current_actor
_this_mod = importlib.import_module(__name__)
@ -44,7 +44,7 @@ class RemoteActorError(Exception):
def __init__(
self,
message: str,
suberror_type: Optional[Type[BaseException]] = None,
suberror_type: Type[BaseException] | None = None,
**msgdata
) -> None:
@ -53,21 +53,36 @@ class RemoteActorError(Exception):
self.type = suberror_type
self.msgdata = msgdata
@property
def src_actor_uid(self) -> tuple[str, str] | None:
return self.msgdata.get('src_actor_uid')
class InternalActorError(RemoteActorError):
"""Remote internal ``tractor`` error indicating
'''
Remote internal ``tractor`` error indicating
failure of some primitive or machinery.
"""
'''
class ContextCancelled(RemoteActorError):
'''
Inter-actor task context was cancelled by either a call to
``Portal.cancel_actor()`` or ``Context.cancel()``.
'''
@property
def canceller(self) -> tuple[str, str] | None:
value = self.msgdata.get('canceller')
if value:
return tuple(value)
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class ContextCancelled(RemoteActorError):
"Inter-actor task context cancelled itself on the callee side."
class NoResult(RuntimeError):
"No final result is expected for this actor"
@ -106,13 +121,17 @@ def pack_error(
else:
tb_str = traceback.format_exc()
return {
'error': {
'tb_str': tb_str,
'type_str': type(exc).__name__,
}
error_msg = {
'tb_str': tb_str,
'type_str': type(exc).__name__,
'src_actor_uid': current_actor().uid,
}
if isinstance(exc, ContextCancelled):
error_msg.update(exc.msgdata)
return {'error': error_msg}
def unpack_error(
@ -136,7 +155,7 @@ def unpack_error(
if type_name == 'ContextCancelled':
err_type = ContextCancelled
suberror_type = trio.Cancelled
suberror_type = RemoteActorError
else: # try to lookup a suitable local error type
for ns in [

View File

@ -45,10 +45,8 @@ from ._exceptions import (
NoResult,
ContextCancelled,
)
from ._streaming import (
Context,
MsgStream,
)
from ._context import Context
from ._streaming import MsgStream
log = get_logger(__name__)
@ -103,7 +101,7 @@ class Portal:
# When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some
# point.
self._expect_result: Optional[Context] = None
self._expect_result: Context | None = None
self._streams: set[MsgStream] = set()
self.actor = current_actor()
@ -209,7 +207,10 @@ class Portal:
try:
# send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield
with trio.move_on_after(timeout or self.cancel_timeout) as cs:
with trio.move_on_after(
timeout
or self.cancel_timeout
) as cs:
cs.shield = True
await self.run_from_ns('self', 'cancel')
@ -330,7 +331,9 @@ class Portal:
f'{async_gen_func} must be an async generator function!')
fn_mod_path, fn_name = NamespacePath.from_ref(
async_gen_func).to_tuple()
async_gen_func
).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
@ -396,13 +399,16 @@ class Portal:
raise TypeError(
f'{func} must be an async generator function!')
# TODO: i think from here onward should probably
# just be factored into an `@acm` inside a new
# a new `_context.py` mod.
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
fn_name,
kwargs
kwargs,
)
assert ctx._remote_func_type == 'context'
@ -426,29 +432,47 @@ class Portal:
f' but received a non-error msg:\n{pformat(msg)}'
)
_err: Optional[BaseException] = None
ctx._portal = self
_err: BaseException | None = None
ctx._portal: Portal = self
uid = self.channel.uid
cid = ctx.cid
etype: Optional[Type[BaseException]] = None
uid: tuple = self.channel.uid
cid: str = ctx.cid
etype: Type[BaseException] | None = None
# deliver context instance and .started() msg value in open tuple.
# deliver context instance and .started() msg value in enter
# tuple.
try:
async with trio.open_nursery() as scope_nursery:
ctx._scope_nursery = scope_nursery
# do we need this?
# await trio.lowlevel.checkpoint()
async with trio.open_nursery() as nurse:
ctx._scope_nursery = nurse
ctx._scope = nurse.cancel_scope
yield ctx, first
# when in allow_ovveruns mode there may be lingering
# overflow sender tasks remaining?
if nurse.child_tasks:
# ensure we are in overrun state with
# ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery!
if (
not ctx._allow_overruns
or len(nurse.child_tasks) > 1
):
raise RuntimeError(
'Context has sub-tasks but is '
'not in `allow_overruns=True` Mode!?'
)
ctx._scope.cancel()
except ContextCancelled as err:
_err = err
# swallow and mask cross-actor task context cancels that
# were initiated by *this* side's task.
if not ctx._cancel_called:
# context was cancelled at the far end but was
# not part of this end requesting that cancel
# so raise for the local task to respond and handle.
# XXX: this should NEVER happen!
# from ._debug import breakpoint
# await breakpoint()
raise
# if the context was cancelled by client code
@ -468,17 +492,17 @@ class Portal:
) as err:
etype = type(err)
# the context cancels itself on any cancel
# causing error.
if ctx.chan.connected():
log.cancel(
'Context cancelled for task, sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
# cancel ourselves on any error.
log.cancel(
'Context cancelled for task, sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
try:
await ctx.cancel()
else:
except trio.BrokenResourceError:
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
@ -487,12 +511,7 @@ class Portal:
raise
finally:
# in the case where a runtime nursery (due to internal bug)
# or a remote actor transmits an error we want to be
# sure we get the error the underlying feeder mem chan.
# if it's not raised here it *should* be raised from the
# msg loop nursery right?
else:
if ctx.chan.connected():
log.info(
'Waiting on final context-task result for\n'
@ -505,6 +524,7 @@ class Portal:
f'value from callee `{result}`'
)
finally:
# though it should be impossible for any tasks
# operating *in* this scope to have survived
# we tear down the runtime feeder chan last

View File

@ -254,7 +254,9 @@ async def open_root_actor(
# tempn.start_soon(an.exited.wait)
logger.cancel("Shutting down root actor")
await actor.cancel()
await actor.cancel(
requesting_uid=actor.uid,
)
finally:
_state._current_actor = None

View File

@ -28,9 +28,11 @@ import inspect
import signal
import sys
from typing import (
Any, Optional,
Union, TYPE_CHECKING,
Any,
Callable,
Optional,
Union,
TYPE_CHECKING,
)
import uuid
from types import ModuleType
@ -44,7 +46,10 @@ import trio # type: ignore
from trio_typing import TaskStatus
from ._ipc import Channel
from ._streaming import Context
from ._context import (
mk_context,
Context,
)
from .log import get_logger
from ._exceptions import (
pack_error,
@ -53,7 +58,6 @@ from ._exceptions import (
is_multi_cancelled,
ContextCancelled,
TransportClosed,
StreamOverrun,
)
from . import _debug
from ._discovery import get_arbiter
@ -79,7 +83,7 @@ async def _invoke(
is_rpc: bool = True,
task_status: TaskStatus[
Union[trio.CancelScope, BaseException]
Union[Context, BaseException]
] = trio.TASK_STATUS_IGNORED,
):
'''
@ -99,7 +103,14 @@ async def _invoke(
# activated cancel scope ref
cs: Optional[trio.CancelScope] = None
ctx = actor.get_context(chan, cid)
ctx = actor.get_context(
chan,
cid,
# We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to
# open the stream with this option.
# allow_overruns=True,
)
context: bool = False
if getattr(func, '_tractor_stream_function', False):
@ -138,7 +149,10 @@ async def _invoke(
):
raise TypeError(f'{func} must be an async function!')
coro = func(**kwargs)
try:
coro = func(**kwargs)
except TypeError:
raise
if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid})
@ -150,7 +164,8 @@ async def _invoke(
# of the async gen in order to be sure the cancel
# is propagated!
with cancel_scope as cs:
task_status.started(cs)
ctx._scope = cs
task_status.started(ctx)
async with aclosing(coro) as agen:
async for item in agen:
# TODO: can we send values back in here?
@ -176,7 +191,8 @@ async def _invoke(
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs:
task_status.started(cs)
ctx._scope = cs
task_status.started(ctx)
await coro
if not cs.cancelled_caught:
@ -189,19 +205,26 @@ async def _invoke(
await chan.send({'functype': 'context', 'cid': cid})
try:
async with trio.open_nursery() as scope_nursery:
ctx._scope_nursery = scope_nursery
cs = scope_nursery.cancel_scope
task_status.started(cs)
async with trio.open_nursery() as nurse:
ctx._scope_nursery = nurse
ctx._scope = nurse.cancel_scope
task_status.started(ctx)
res = await coro
await chan.send({'return': res, 'cid': cid})
except BaseExceptionGroup:
# XXX: do we ever trigger this block any more?
except (
BaseExceptionGroup,
trio.Cancelled,
):
# if a context error was set then likely
# thei multierror was raised due to that
if ctx._error is not None:
raise ctx._error from None
if ctx._remote_error is not None:
raise ctx._remote_error
# maybe TODO: pack in ``trio.Cancelled.__traceback__`` here
# so they can be unwrapped and displayed on the caller
# side?
raise
finally:
@ -213,38 +236,67 @@ async def _invoke(
# associated child isn't in debug any more
await _debug.maybe_wait_for_debugger()
ctx = actor._contexts.pop((chan.uid, cid))
if ctx:
log.runtime(
f'Context entrypoint {func} was terminated:\n{ctx}'
)
assert cs
if cs.cancelled_caught:
if ctx.cancelled_caught:
# TODO: pack in ``trio.Cancelled.__traceback__`` here
# so they can be unwrapped and displayed on the caller
# side!
# first check for and raise any remote error
# before raising any context cancelled case
# so that real remote errors don't get masked as
# ``ContextCancelled``s.
re = ctx._remote_error
if re:
ctx._maybe_raise_remote_err(re)
fname = func.__name__
if ctx._cancel_called:
msg = f'`{fname}()` cancelled itself'
cs: trio.CancelScope = ctx._scope
if cs.cancel_called:
canceller = ctx._cancel_called_remote
# await _debug.breakpoint()
elif cs.cancel_called:
msg = (
f'`{fname}()` was remotely cancelled by its caller '
f'{ctx.chan.uid}'
# NOTE / TODO: if we end up having
# ``Actor._cancel_task()`` call
# ``Context.cancel()`` directly, we're going to
# need to change this logic branch since it will
# always enter..
if ctx._cancel_called:
msg = f'`{fname}()`@{actor.uid} cancelled itself'
else:
msg = (
f'`{fname}()`@{actor.uid} '
'was remotely cancelled by '
)
# if the channel which spawned the ctx is the
# one that cancelled it then we report that, vs.
# it being some other random actor that for ex.
# some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx.
if canceller == ctx.chan.uid:
msg += f'its caller {canceller}'
else:
msg += f'remote actor {canceller}'
# TODO: does this ever get set any more or can
# we remove it?
if ctx._cancel_msg:
msg += f' with msg:\n{ctx._cancel_msg}'
# task-contex was either cancelled by request using
# ``Portal.cancel_actor()`` or ``Context.cancel()``
# on the far end, or it was cancelled by the local
# (callee) task, so relay this cancel signal to the
# other side.
raise ContextCancelled(
msg,
suberror_type=trio.Cancelled,
canceller=canceller,
)
if ctx._cancel_msg:
msg += f' with msg:\n{ctx._cancel_msg}'
# task-contex was cancelled so relay to the cancel to caller
raise ContextCancelled(
msg,
suberror_type=trio.Cancelled,
)
else:
# regular async function
try:
@ -259,12 +311,17 @@ async def _invoke(
)
with cancel_scope as cs:
task_status.started(cs)
ctx._scope = cs
task_status.started(ctx)
result = await coro
log.cancel(f'result: {result}')
fname = func.__name__
log.runtime(f'{fname}() result: {result}')
if not failed_resp:
# only send result if we know IPC isn't down
await chan.send({'return': result, 'cid': cid})
await chan.send(
{'return': result,
'cid': cid}
)
except (
Exception,
@ -307,6 +364,7 @@ async def _invoke(
# always ship errors back to caller
err_msg = pack_error(err, tb=tb)
err_msg['cid'] = cid
try:
await chan.send(err_msg)
@ -323,14 +381,21 @@ async def _invoke(
f"Failed to ship error to caller @ {chan.uid} !?"
)
if cs is None:
# error is from above code not from rpc invocation
# error is probably from above coro running code *not from the
# underlyingn rpc invocation* since a scope was never allocated
# around actual coroutine await.
if ctx._scope is None:
# we don't ever raise directly here to allow the
# msg-loop-scheduler to continue running for this
# channel.
task_status.started(err)
finally:
# RPC task bookeeping
try:
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
ctx, func, is_complete = actor._rpc_tasks.pop(
(chan, cid)
)
is_complete.set()
except KeyError:
@ -339,6 +404,9 @@ async def _invoke(
# cancel scope will not have been inserted yet
log.warning(
f"Task {func} likely errored or cancelled before start")
else:
log.cancel(f'{func.__name__}({kwargs}) failed?')
finally:
if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed")
@ -437,6 +505,7 @@ class Actor:
self.uid = (name, uid or str(uuid.uuid4()))
self._cancel_complete = trio.Event()
self._cancel_called_remote: tuple[str, tuple] | None = None
self._cancel_called: bool = False
# retreive and store parent `__main__` data which
@ -475,7 +544,7 @@ class Actor:
# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[
tuple[Channel, str],
tuple[trio.CancelScope, Callable, trio.Event]
tuple[Context, Callable, trio.Event]
] = {}
# map {actor uids -> Context}
@ -650,8 +719,8 @@ class Actor:
if (
local_nursery
):
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
if chan._cancel_called:
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
@ -784,75 +853,15 @@ class Actor:
f'\n{msg}')
return
send_chan = ctx._send_chan
log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
# XXX: we do **not** maintain backpressure and instead
# opt to relay stream overrun errors to the sender.
try:
send_chan.send_nowait(msg)
# if an error is deteced we should always
# expect it to be raised by any context (stream)
# consumer task
await ctx._maybe_raise_from_remote_msg(msg)
except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the
# local task has already sent a 'stop' / StopAsyncInteration
# to the other side but and possibly has closed the local
# feeder mem chan? Do we wait for some kind of ack or just
# let this fail silently and bubble up (currently)?
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed")
return
except trio.WouldBlock:
# XXX: always push an error even if the local
# receiver is in overrun state.
await ctx._maybe_raise_from_remote_msg(msg)
uid = chan.uid
lines = [
'Task context stream was overrun',
f'local task: {cid} @ {self.uid}',
f'remote sender: {uid}',
]
if not ctx._stream_opened:
lines.insert(
1,
f'\n*** No stream open on `{self.uid[0]}` side! ***\n'
)
text = '\n'.join(lines)
if ctx._backpressure:
log.warning(text)
try:
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
else:
try:
raise StreamOverrun(text) from None
except StreamOverrun as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
try:
await chan.send(err_msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
return await ctx._deliver_msg(msg)
def get_context(
self,
chan: Channel,
cid: str,
msg_buffer_size: Optional[int] = None,
msg_buffer_size: int | None = None,
allow_overruns: bool = False,
) -> Context:
'''
@ -868,6 +877,7 @@ class Actor:
assert actor_uid
try:
ctx = self._contexts[(actor_uid, cid)]
ctx._allow_overruns = allow_overruns
# adjust buffer size if specified
state = ctx._send_chan._state # type: ignore
@ -875,15 +885,11 @@ class Actor:
state.max_buffer_size = msg_buffer_size
except KeyError:
send_chan: trio.MemorySendChannel
recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(
msg_buffer_size or self.msg_buffer_size)
ctx = Context(
ctx = mk_context(
chan,
cid,
_send_chan=send_chan,
_recv_chan=recv_chan,
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
_allow_overruns=allow_overruns,
)
self._contexts[(actor_uid, cid)] = ctx
@ -895,7 +901,8 @@ class Actor:
ns: str,
func: str,
kwargs: dict,
msg_buffer_size: Optional[int] = None,
msg_buffer_size: int | None = None,
allow_overruns: bool = False,
) -> Context:
'''
@ -909,9 +916,16 @@ class Actor:
'''
cid = str(uuid.uuid4())
assert chan.uid
ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size)
ctx = self.get_context(
chan,
cid,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
await chan.send(
{'cmd': (ns, func, kwargs, self.uid, cid)}
)
# Wait on first response msg and validate; this should be
# immediate.
@ -921,7 +935,11 @@ class Actor:
if 'error' in first_msg:
raise unpack_error(first_msg, chan)
elif functype not in ('asyncfunc', 'asyncgen', 'context'):
elif functype not in (
'asyncfunc',
'asyncgen',
'context',
):
raise ValueError(f"{first_msg} is an invalid response packet?")
ctx._remote_func_type = functype
@ -980,7 +998,7 @@ class Actor:
log.warning(
f"Failed to connect to parent @ {parent_addr},"
" closing server")
await self.cancel()
await self.cancel(requesting_uid=self.uid)
raise
async def _serve_forever(
@ -1033,7 +1051,11 @@ class Actor:
assert self._service_n
self._service_n.start_soon(self.cancel)
async def cancel(self) -> bool:
async def cancel(
self,
requesting_uid: tuple[str, str],
) -> bool:
'''
Cancel this actor's runtime.
@ -1047,6 +1069,7 @@ class Actor:
'''
log.cancel(f"{self.uid} is trying to cancel")
self._cancel_called_remote: tuple = requesting_uid
self._cancel_called = True
# cancel all ongoing rpc tasks
@ -1060,7 +1083,7 @@ class Actor:
dbcs.cancel()
# kill all ongoing tasks
await self.cancel_rpc_tasks()
await self.cancel_rpc_tasks(requesting_uid=requesting_uid)
# stop channel server
self.cancel_server()
@ -1086,7 +1109,13 @@ class Actor:
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def _cancel_task(self, cid, chan):
async def _cancel_task(
self,
cid: str,
chan: Channel,
requesting_uid: tuple[str, str] | None = None,
) -> bool:
'''
Cancel a local task by call-id / channel.
@ -1101,35 +1130,51 @@ class Actor:
try:
# this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from this channel
scope, func, is_complete = self._rpc_tasks[(chan, cid)]
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
scope = ctx._scope
except KeyError:
log.cancel(f"{cid} has already completed/terminated?")
return
return True
log.cancel(
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
if (
ctx._cancel_called_remote is None
and requesting_uid
):
ctx._cancel_called_remote: tuple = requesting_uid
# don't allow cancelling this function mid-execution
# (is this necessary?)
if func is self._cancel_task:
return
return True
# TODO: shouldn't we eventually be calling ``Context.cancel()``
# directly here instead (since that method can handle both
# side's calls into it?
scope.cancel()
# wait for _invoke to mark the task complete
log.runtime(
f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
'Waiting on task to cancel:\n'
f'cid: {cid}\nfunc: {func}\n'
f'peer: {chan.uid}\n'
)
await is_complete.wait()
log.runtime(
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
return True
async def cancel_rpc_tasks(
self,
only_chan: Optional[Channel] = None,
only_chan: Channel | None = None,
requesting_uid: tuple[str, str] | None = None,
) -> None:
'''
Cancel all existing RPC responder tasks using the cancel scope
@ -1141,7 +1186,7 @@ class Actor:
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (
(chan, cid),
(scope, func, is_complete),
(ctx, func, is_complete),
) in tasks.copy().items():
if only_chan is not None:
if only_chan != chan:
@ -1149,7 +1194,11 @@ class Actor:
# TODO: this should really done in a nursery batch
if func != self._cancel_task:
await self._cancel_task(cid, chan)
await self._cancel_task(
cid,
chan,
requesting_uid=requesting_uid,
)
log.cancel(
f"Waiting for remaining rpc tasks to complete {tasks}")
@ -1235,8 +1284,8 @@ async def async_main(
Actor runtime entrypoint; start the IPC channel server, maybe connect
back to the parent, and startup all core machinery tasks.
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
A "root" (or "top-level") nursery for this actor is opened here and
when cancelled/terminated effectively closes the actor's "runtime".
'''
# attempt to retreive ``trio``'s sigint handler and stash it
@ -1330,13 +1379,15 @@ async def async_main(
)
)
log.runtime("Waiting on service nursery to complete")
log.runtime("Service nursery complete")
log.runtime("Waiting on root nursery to complete")
log.runtime(
"Service nursery complete\n"
"Waiting on root nursery to complete"
)
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err:
log.info("Closing all actor lifetime contexts")
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not registered_with_arbiter:
@ -1357,7 +1408,14 @@ async def async_main(
await try_ship_error_to_parent(actor._parent_chan, err)
# always!
log.exception("Actor errored:")
match err:
case ContextCancelled():
log.cancel(
f'Actor: {actor.uid} was task-context-cancelled with,\n'
f'str(err)'
)
case _:
log.exception("Actor errored:")
raise
finally:
@ -1424,15 +1482,16 @@ async def process_messages(
) -> bool:
'''
Process messages for the IPC transport channel async-RPC style.
This is the per-channel, low level RPC task scheduler loop.
Receive multiplexed RPC requests, spawn handler tasks and deliver
responses over or boxed errors back to the "caller" task.
Receive multiplexed RPC request messages from some remote process,
spawn handler tasks depending on request type and deliver responses
or boxed errors back to the remote caller (task).
'''
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
msg = None
msg: dict | None = None
nursery_cancelled_before_task: bool = False
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
@ -1454,7 +1513,10 @@ async def process_messages(
for (channel, cid) in actor._rpc_tasks.copy():
if channel is chan:
await actor._cancel_task(cid, channel)
await actor._cancel_task(
cid,
channel,
)
log.runtime(
f"Msg loop signalled to terminate for"
@ -1468,12 +1530,14 @@ async def process_messages(
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter
# via its per-remote-context memory channel.
await actor._push_result(chan, cid, msg)
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
# TODO: implement with ``match:`` syntax?
# process command request
try:
ns, funcname, kwargs, actorid, cid = msg['cmd']
@ -1493,13 +1557,12 @@ async def process_messages(
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(actor, funcname)
if funcname == 'cancel':
func = actor.cancel
kwargs['requesting_uid'] = chan.uid
# don't start entire actor runtime
# cancellation if this actor is in debug
# mode
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
@ -1511,43 +1574,56 @@ async def process_messages(
# msg loop and break out into
# ``async_main()``
log.cancel(
f"Actor {actor.uid} was remotely cancelled "
"Actor runtime for was remotely cancelled "
f"by {chan.uid}"
)
await _invoke(
actor, cid, chan, func, kwargs, is_rpc=False
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
log.cancel(
f'Cancelling msg loop for {chan.uid}'
)
loop_cs.cancel()
break
if funcname == '_cancel_task':
func = actor._cancel_task
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``async_main()``
kwargs['chan'] = chan
log.cancel(
f'Remote request to cancel task\n'
f'remote actor: {chan.uid}\n'
f'task: {cid}'
# with trio.CancelScope(shield=True):
kwargs['chan'] = chan
target_cid = kwargs['cid']
kwargs['requesting_uid'] = chan.uid
log.cancel(
f'Remote request to cancel task\n'
f'remote actor: {chan.uid}\n'
f'task: {target_cid}'
)
try:
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
try:
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
except BaseException:
log.exception("failed to cancel task?")
except BaseException:
log.exception("failed to cancel task?")
continue
else:
# normally registry methods, eg.
# ``.register_actor()`` etc.
func = getattr(actor, funcname)
continue
else:
# complain to client about restricted modules
try:
@ -1562,34 +1638,49 @@ async def process_messages(
log.runtime(f"Spawning task for {func}")
assert actor._service_n
try:
cs = await actor._service_n.start(
partial(_invoke, actor, cid, chan, func, kwargs),
ctx: Context = await actor._service_n.start(
partial(
_invoke,
actor,
cid,
chan,
func,
kwargs,
),
name=funcname,
)
except (
RuntimeError,
BaseExceptionGroup,
):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task = True
nursery_cancelled_before_task: bool = True
break
# never allow cancelling cancel requests (results in
# deadlock and other weird behaviour)
# if func != actor.cancel:
if isinstance(cs, Exception):
# in the lone case where a ``Context`` is not
# delivered, it's likely going to be a locally
# scoped exception from ``_invoke()`` itself.
if isinstance(ctx, Exception):
log.warning(
f"Task for RPC func {func} failed with"
f"{cs}")
f"{ctx}"
)
continue
else:
# mark that we have ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
actor._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
ctx,
func,
trio.Event(),
)
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
@ -1630,7 +1721,15 @@ async def process_messages(
else:
# ship any "internal" exception (i.e. one from internal
# machinery not from an rpc task) to parent
log.exception("Actor errored:")
match err:
case ContextCancelled():
log.cancel(
f'Actor: {actor.uid} was context-cancelled with,\n'
f'str(err)'
)
case _:
log.exception("Actor errored:")
if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err)
@ -1642,7 +1741,8 @@ async def process_messages(
# msg debugging for when he machinery is brokey
log.runtime(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
f"with last msg:\n{msg}"
)
# transport **was not** disconnected
return False

View File

@ -23,12 +23,13 @@ import sys
import platform
from typing import (
Any,
Awaitable,
Literal,
Optional,
Callable,
TypeVar,
TYPE_CHECKING,
)
from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup
import trio
@ -59,7 +60,7 @@ if TYPE_CHECKING:
log = get_logger('tractor')
# placeholder for an mp start context if so using that backend
_ctx: mp.context.BaseContext | None = None
_ctx: Optional[mp.context.BaseContext] = None
SpawnMethodKey = Literal[
'trio', # supported on all platforms
'mp_spawn',
@ -85,7 +86,7 @@ else:
def try_set_start_method(
key: SpawnMethodKey
) -> mp.context.BaseContext | None:
) -> Optional[mp.context.BaseContext]:
'''
Attempt to set the method for process starting, aka the "actor
spawning backend".
@ -199,37 +200,16 @@ async def cancel_on_completion(
async def do_hard_kill(
proc: trio.Process,
terminate_after: int = 3,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs:
# NOTE: code below was copied verbatim from the now deprecated
# (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc
# string:
#
# Close any pipes we have to the process (both input and output)
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
if proc.stdout is not None:
await proc.stdout.aclose()
if proc.stderr is not None:
await proc.stderr.aclose()
try:
await proc.wait()
finally:
if proc.returncode is None:
proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
@ -375,11 +355,12 @@ async def trio_proc(
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: trio.Process | None = None
proc: Optional[trio.Process] = None
try:
try:
# TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process(spawn_cmd)
proc = await trio.lowlevel.open_process( # type: ignore
spawn_cmd)
log.runtime(f"Started {proc}")
@ -457,14 +438,14 @@ async def trio_proc(
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
log.cancel(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()
finally:
# XXX NOTE XXX: The "hard" reap since no actor zombies are
# allowed! Do this **after** cancellation/teardown to avoid
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')

View File

@ -23,11 +23,6 @@ from typing import (
Any,
)
import trio
from ._exceptions import NoRuntime
_current_actor: Optional['Actor'] = None # type: ignore # noqa
_runtime_vars: dict[str, Any] = {
'_debug_mode': False,
@ -37,8 +32,11 @@ _runtime_vars: dict[str, Any] = {
def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore # noqa
"""Get the process-local actor instance.
"""
'''
Get the process-local actor instance.
'''
from ._exceptions import NoRuntime
if _current_actor is None and err_on_no_runtime:
raise NoRuntime("No local actor has been initialized yet")
@ -46,16 +44,20 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore #
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
'''
Bool determining if this actor is running in the top-most process.
'''
import multiprocessing as mp
return mp.current_process().name == 'MainProcess'
def debug_mode() -> bool:
"""Bool determining if "debug mode" is on which enables
'''
Bool determining if "debug mode" is on which enables
remote subactor pdb entry on crashes.
"""
'''
return bool(_runtime_vars['_debug_mode'])

View File

@ -14,31 +14,36 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
Message stream types and APIs.
"""
The machinery and types behind ``Context.open_stream()``
'''
from __future__ import annotations
import inspect
from contextlib import asynccontextmanager
from dataclasses import dataclass
from contextlib import asynccontextmanager as acm
from typing import (
Any,
Optional,
Callable,
AsyncGenerator,
AsyncIterator
AsyncIterator,
TYPE_CHECKING,
)
import warnings
import trio
from ._ipc import Channel
from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor
from ._exceptions import (
unpack_error,
)
from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver
from .trionics import (
broadcast_receiver,
BroadcastReceiver,
)
if TYPE_CHECKING:
from ._context import Context
log = get_logger(__name__)
@ -70,9 +75,9 @@ class MsgStream(trio.abc.Channel):
'''
def __init__(
self,
ctx: 'Context', # typing: ignore # noqa
ctx: Context, # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel,
_broadcaster: Optional[BroadcastReceiver] = None,
_broadcaster: BroadcastReceiver | None = None,
) -> None:
self._ctx = ctx
@ -275,7 +280,7 @@ class MsgStream(trio.abc.Channel):
# still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``).
@asynccontextmanager
@acm
async def subscribe(
self,
@ -335,8 +340,8 @@ class MsgStream(trio.abc.Channel):
Send a message over this stream to the far end.
'''
if self._ctx._error:
raise self._ctx._error # from None
if self._ctx._remote_error:
raise self._ctx._remote_error # from None
if self._closed:
raise trio.ClosedResourceError('This stream was already closed')
@ -344,371 +349,11 @@ class MsgStream(trio.abc.Channel):
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
@dataclass
class Context:
'''
An inter-actor, ``trio`` task communication context.
NB: This class should never be instatiated directly, it is delivered
by either runtime machinery to a remotely started task or by entering
``Portal.open_context()``.
Allows maintaining task or protocol specific state between
2 communicating actor tasks. A unique context is created on the
callee side/end for every request to a remote actor from a portal.
A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel, open task oriented
message streams and acts as an IPC aware inter-actor-task cancel
scope.
'''
chan: Channel
cid: str
# these are the "feeder" channels for delivering
# message values to the local task from the runtime
# msg processing loop.
_recv_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel
_remote_func_type: Optional[str] = None
# only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa
_result: Optional[Any] = False
_error: Optional[BaseException] = None
# status flags
_cancel_called: bool = False
_cancel_msg: Optional[str] = None
_enter_debugger_on_cancel: bool = True
_started_called: bool = False
_started_received: bool = False
_stream_opened: bool = False
# only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = False
async def send_yield(self, data: Any) -> None:
warnings.warn(
"`Context.send_yield()` is now deprecated. "
"Use ``MessageStream.send()``. ",
DeprecationWarning,
stacklevel=2,
)
await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
async def _maybe_raise_from_remote_msg(
self,
msg: dict[str, Any],
) -> None:
'''
(Maybe) unpack and raise a msg error into the local scope
nursery for this context.
Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task.
'''
error = msg.get('error')
if error:
# If this is an error message from a context opened by
# ``Portal.open_context()`` we want to interrupt any ongoing
# (child) tasks within that context to be notified of the remote
# error relayed here.
#
# The reason we may want to raise the remote error immediately
# is that there is no guarantee the associated local task(s)
# will attempt to read from any locally opened stream any time
# soon.
#
# NOTE: this only applies when
# ``Portal.open_context()`` has been called since it is assumed
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled) and
self._cancel_called
):
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
self._error = error
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
if self._scope_nursery:
async def raiser():
raise self._error from None
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
if not self._scope_nursery._closed: # type: ignore
self._scope_nursery.start_soon(raiser)
async def cancel(
self,
msg: Optional[str] = None,
) -> None:
'''
Cancel this inter-actor-task context.
Request that the far side cancel it's current linked context,
Timeout quickly in an attempt to sidestep 2-generals...
'''
side = 'caller' if self._portal else 'callee'
if msg:
assert side == 'callee', 'Only callee side can provide cancel msg'
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
if side == 'caller':
if not self._portal:
raise RuntimeError(
"No portal found, this is likely a callee side context"
)
cid = self.cid
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.cancel(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
# if not self._portal.channel.connected():
if not self.chan.connected():
log.cancel(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
else:
log.cancel(
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
# callee side remote task
else:
self._cancel_msg = msg
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36
assert self._scope_nursery
self._scope_nursery.cancel_scope.cancel()
if self._recv_chan:
await self._recv_chan.aclose()
@asynccontextmanager
async def open_stream(
self,
backpressure: Optional[bool] = True,
msg_buffer_size: Optional[int] = None,
) -> AsyncGenerator[MsgStream, None]:
'''
Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and
callee for the stream to logically be considered "connected".
A ``MsgStream`` is currently "one-shot" use, meaning if you
close it you can not "re-open" it for streaming and instead you
must re-establish a new surrounding ``Context`` using
``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support
"re-opening":
- pausing a stream can be done with a message.
- task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of
``trio``'s cancellation system.
'''
actor = current_actor()
# here we create a mem chan that corresponds to the
# far end caller / callee.
# Likewise if the surrounding context has been cancelled we error here
# since it likely means the surrounding block was exited or
# killed
if self._cancel_called:
task = trio.lowlevel.current_task().name
raise ContextCancelled(
f'Context around {actor.uid[0]}:{task} was already cancelled!'
)
if not self._portal and not self._started_called:
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.start_remote_task()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx = actor.get_context(
self.chan,
self.cid,
msg_buffer_size=msg_buffer_size,
)
ctx._backpressure = backpressure
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if ctx._recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?')
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
if self._portal:
self._portal._streams.add(stream)
try:
self._stream_opened = True
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# NOTE: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await stream.aclose()
finally:
if self._portal:
try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
async def result(self) -> Any:
'''
From a caller side, wait for and return the final result from
the callee side task.
'''
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
if self._result is False:
if not self._recv_chan._closed: # type: ignore
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
msg = await self._recv_chan.receive()
try:
self._result = msg['return']
break
except KeyError as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(
msg, self._portal.channel
) from msgerr
return self._result
async def started(
self,
value: Optional[Any] = None
) -> None:
'''
Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side.
On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``.
'''
if self._portal:
raise RuntimeError(
f"Caller side context {self} can not call started!")
elif self._started_called:
raise RuntimeError(
f"called 'started' twice on context with {self.chan.uid}")
await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True
# TODO: do we need a restart api?
# async def restart(self) -> None:
# pass
def stream(func: Callable) -> Callable:
"""Mark an async function as a streaming routine with ``@stream``.
'''
Mark an async function as a streaming routine with ``@stream``.
"""
# annotate
'''
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_stream_function = True # type: ignore
@ -734,22 +379,3 @@ def stream(func: Callable) -> Callable:
"(Or ``to_trio`` if using ``asyncio`` in guest mode)."
)
return func
def context(func: Callable) -> Callable:
"""Mark an async function as a streaming routine with ``@context``.
"""
# annotate
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore
sig = inspect.signature(func)
params = sig.parameters
if 'ctx' not in params:
raise TypeError(
"The first argument to the context function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
return func

View File

@ -37,7 +37,7 @@ import trio
import wrapt
from ..log import get_logger
from .._streaming import Context
from .._context import Context
__all__ = ['pub']
@ -148,7 +148,8 @@ def pub(
*,
tasks: set[str] = set(),
):
"""Publisher async generator decorator.
'''
Publisher async generator decorator.
A publisher can be called multiple times from different actors but
will only spawn a finite set of internal tasks to stream values to
@ -227,7 +228,8 @@ def pub(
running in a single actor to stream data to an arbitrary number of
subscribers. If you are ok to have a new task running for every call
to ``pub_service()`` then probably don't need this.
"""
'''
global _pubtask2lock
# handle the decorator not called with () case

View File

@ -82,6 +82,10 @@ class StackLevelAdapter(logging.LoggerAdapter):
msg: str,
) -> None:
'''
IPC level msg-ing.
'''
return self.log(5, msg)
def runtime(
@ -94,12 +98,20 @@ class StackLevelAdapter(logging.LoggerAdapter):
self,
msg: str,
) -> None:
'''
Cancellation logging, mostly for runtime reporting.
'''
return self.log(16, msg)
def pdb(
self,
msg: str,
) -> None:
'''
Debugger logging.
'''
return self.log(500, msg)
def log(self, level, msg, *args, **kwargs):

View File

@ -237,7 +237,7 @@ async def maybe_open_context(
yielded = _Cache.values[ctx_key]
except KeyError:
log.info(f'Allocating new {acm_func} for {ctx_key}')
log.debug(f'Allocating new {acm_func} for {ctx_key}')
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
@ -265,7 +265,7 @@ async def maybe_open_context(
if yielded is not None:
# if no more consumers, teardown the client
if _Cache.users <= 0:
log.info(f'De-allocating resource for {ctx_key}')
log.debug(f'De-allocating resource for {ctx_key}')
# XXX: if we're cancelled we the entry may have never
# been entered since the nursery task was killed.