Compare commits

...

85 Commits

Author SHA1 Message Date
Tyler Goodlet 88860cc288 Drop bad .close() call 2021-07-05 09:21:37 -04:00
Tyler Goodlet 7d67f54ae8 Proxy asyncio cancelleds as well 2021-07-05 09:21:37 -04:00
Tyler Goodlet e14521f3b7 Power of 2 cuz puters 2021-07-05 09:21:37 -04:00
Tyler Goodlet 02fbe9721b Don't kill root's immediate children when in debug
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
2021-07-05 09:21:37 -04:00
Tyler Goodlet 7cc25fc99f WIP redo asyncio async gen streaming 2021-07-05 09:21:37 -04:00
Tyler Goodlet de3a6b9f09 Support asyncio actors with the trio spawner backend 2021-07-05 09:21:37 -04:00
Tyler Goodlet 5968f60103 Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-07-05 09:21:37 -04:00
Tyler Goodlet ad11d0eef5 Support asyncio actors with the trio spawner backend 2021-07-05 09:21:37 -04:00
Tyler Goodlet 7e2325d999 Export portal type at top level 2021-07-05 09:21:37 -04:00
Tyler Goodlet 77c5ee5c04 Link to SC on wikipedia 2021-07-05 09:21:37 -04:00
Tyler Goodlet c8ca7e0c61 Add per actor debug mode toggle 2021-07-05 09:21:37 -04:00
Tyler Goodlet 3a84f8a8b5 Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-07-05 09:21:37 -04:00
Tyler Goodlet 863617d451 Pass func refs 2021-07-05 09:21:37 -04:00
Tyler Goodlet e0ae1608ac Add initial infected asyncio error propagation test 2021-07-05 09:21:37 -04:00
Tyler Goodlet 58bff71a41 Raise any asyncio errors if in trio task on cancel 2021-07-05 09:21:37 -04:00
Tyler Goodlet 1ecfe7d95b Raise from asyncio error; fixes mypy 2021-07-05 09:21:37 -04:00
Tyler Goodlet d25049ca6b Tweak log msg 2021-07-05 09:21:37 -04:00
Tyler Goodlet 2f7de85ca7 Log error 2021-07-05 09:21:37 -04:00
Tyler Goodlet d55bed0b50 Support asyncio actors with the trio spawner backend 2021-07-05 09:21:37 -04:00
Tyler Goodlet 37693f65f1 Revert removal of `infect_asyncio` in nursery start methods 2021-07-05 09:21:37 -04:00
Tyler Goodlet 50ceceb1d6 Attempt to make mypy happy.. 2021-07-05 09:21:37 -04:00
Tyler Goodlet 53e04a01a9 Add an obnoxious error message on internal failures 2021-07-05 09:21:37 -04:00
Tyler Goodlet c3484f83f0 Wow, fix all the broken async func invoking code..
Clearly this wasn't developed against a task that spawned just an async
func in `asyncio`.. Fix all that and remove a bunch of unnecessary func
layers. Add provisional support for the target receiving the `to_trio`
and `from_trio` channels and for the @tractor.stream marker.
2021-07-05 09:21:37 -04:00
Tyler Goodlet f32d3e1464 Drop entrypoints from `Actor` 2021-07-05 09:21:37 -04:00
Tyler Goodlet e77f5dcdfa Move asyncio guest mode entrypoint to `to_asyncio`
The function is useful if you want to run the "main process" under
`asyncio`. Until `trio` core wraps this better we'll keep our own copy
in the interim (there's a new "inside-out-guest" mode almost on
mainline so hang tight).
2021-07-05 09:21:37 -04:00
Tyler Goodlet 4df633f30e Propagate any spawned `asyncio` task error upwards
This should mostly maintain top level SC principles for any task spawned
using `tractor.to_asyncio.run()`. When the `asyncio` task completes make
sure to cancel the pertaining `trio` cancel scope and raise any error
that may have resulted.

Resolves #120
2021-07-05 09:21:37 -04:00
Tyler Goodlet b733443739 Add a @pub kwarg to allow specifying a "startup response message" 2021-07-05 09:21:37 -04:00
Tyler Goodlet 57de0d6b7b Change trace to transport level 2021-07-05 09:16:20 -04:00
Tyler Goodlet 2dd7c064d3 Flip "trace" level to "transport" level logging 2021-07-05 09:15:16 -04:00
Tyler Goodlet 72a40e82cc Add fast fail test using the context api 2021-07-05 09:15:16 -04:00
Tyler Goodlet f2a1064b17 Adjust debug tests to accomodate no more root clobbering
We may get multiple re-entries to debugger by `bp_forever` sub-actor
now since the root will incrementally try to cancel it only when the tty
lock is not held.
2021-07-05 09:13:12 -04:00
Tyler Goodlet 20b91e2653 Go back to only logging tbs on no debugger 2021-07-05 08:46:53 -04:00
Tyler Goodlet 29cd0cfc63 Comment hard-kill-sidestep for now since nursery version covers it? 2021-07-05 08:46:53 -04:00
Tyler Goodlet 668d785f74 Go back to only logging crashes if no pdb gets engaged 2021-07-05 08:46:53 -04:00
Tyler Goodlet 4800dfe12e Solve the root-cancels-child-in-tty-lock race
Finally this makes a cancelled root actor nursery not clobber child
tasks which request and lock the root's tty for the debugger repl.

Using an edge triggered event which is set after all fifo-lock-queued
tasks are complete, we can be sure that no lingering child tasks are
going to get interrupted during pdb use and tty lock acquisition.
Further, even if new tasks do queue up to get the lock, the root will
incrementally send cancel msgs to each sub-actor only once the tty is
not locked by a (set of) child request task(s). Add shielding around all
the critical sections where the child attempts to allocate the lock from
the root such that it won't be disrupted from cancel messages from the
root after the acquire lock transaction has started.
2021-07-05 08:46:53 -04:00
Tyler Goodlet b83bf062e7 Distinguish between a local pdb unlock and the tty unlock in root 2021-07-05 08:46:53 -04:00
Tyler Goodlet 988574a456 Fix hard kill in debug mode; only do it when debug lock is empty 2021-07-05 08:46:53 -04:00
Tyler Goodlet 7ecad76adf Move some infos to runtime level 2021-07-05 08:46:53 -04:00
Tyler Goodlet f75fe17569 Add PDB level and make runtime below info but above debug 2021-07-05 08:46:53 -04:00
Tyler Goodlet 13077cbdf2 Move debugger wait inside OCA nursery 2021-07-05 08:46:53 -04:00
Tyler Goodlet e03d3c9fa8 Don't shield debugger status wait; it causes hangs 2021-07-05 08:46:53 -04:00
Tyler Goodlet c74c2956e4 Catch and delay errors in the root if debugger is active 2021-07-05 08:46:53 -04:00
Tyler Goodlet 2b4cf6157a Don't shield on root cancel it can causes hangs 2021-07-05 08:46:53 -04:00
Tyler Goodlet 9f1f956902 Don't kill root's immediate children when in debug
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
2021-07-05 08:46:53 -04:00
Tyler Goodlet fcd73568a6 Add debug example that causes pdb stdin clobbering 2021-07-05 08:46:53 -04:00
Tyler Goodlet e1533d35dc Avoid mutate during interate error 2021-07-05 08:46:09 -04:00
Tyler Goodlet 8371621e57 Expect context cancelled when we cancel 2021-07-05 08:46:09 -04:00
Tyler Goodlet 377b8c163c Add pre-stream open error conditions 2021-07-05 08:46:09 -04:00
Tyler Goodlet 6e75913480 De-densify some code 2021-07-05 08:46:09 -04:00
Tyler Goodlet 6f22ee8621 Always shield cancel the caller on cancel-causing-errors, add teardown logging 2021-07-05 08:46:09 -04:00
Tyler Goodlet 17fca76865 First try: pack cancelled tracebacks and ship to caller 2021-07-05 08:45:57 -04:00
Tyler Goodlet 627f1076d6 Add temp warning msg for context cancel call 2021-07-05 08:45:15 -04:00
Tyler Goodlet ced5d42cd4 Add some brief todo notes on idea of shielded breakpoint 2021-07-05 08:45:15 -04:00
Tyler Goodlet 17dc6aaa2d Consider relaying context error via raised-in-scope-nursery task 2021-07-05 08:45:13 -04:00
Tyler Goodlet 288e2b5db1 Set stream "end of channel" after shielded check!
Another face palm that was causing serious issues for code that is using
the `.shielded` feature..

Add a bunch more detailed comments for all this subtlety and hopefully
get it right once and for all. Also aggregated the `trio` errors that
should trigger closure inside `.aclose()`, hopefully that's right too.
2021-07-05 08:44:25 -04:00
Tyler Goodlet 59c8f72952 Don't clobber msg loop mem chan on rx stream close
Revert this change since it really is poking at internals and doesn't
make a lot of sense. If the context is going to be cancelled then the
msg loop will tear down the feed memory channel when ready, we don't
need to be clobbering it and confusing the runtime machinery lol.
2021-07-05 08:44:25 -04:00
Tyler Goodlet 197d291ba8 Modernize streaming tests 2021-07-05 08:44:25 -04:00
Tyler Goodlet 43ce533dbf Speedup the dynamic pubsub test 2021-07-05 08:44:25 -04:00
Tyler Goodlet f2b1ef3fc9 Add detailed ``@tractor.context`` cancellation/termination tests 2021-07-05 08:44:25 -04:00
Tyler Goodlet 87f1af0d85 Drop trailing comma 2021-07-05 08:44:25 -04:00
Tyler Goodlet 201392a586 Adjustments for non-frozen context dataclass change 2021-07-05 08:44:25 -04:00
Tyler Goodlet 83c4b930dc Wait for debugger lock task context termination 2021-07-05 08:44:25 -04:00
Tyler Goodlet 008314554c Fix exception typing 2021-07-05 08:44:25 -04:00
Tyler Goodlet 0af58522a4 Explicitly formalize context/streaming teardown
Add clear teardown semantics for `Context` such that the remote side
cancellation propagation happens only on error or if client code
explicitly requests it (either by exit flag to `Portal.open_context()`
or by manually calling `Context.cancel()`).  Add `Context.result()`
to wait on and capture the final result from a remote context function;
any lingering msg sequence will be consumed/discarded.

Changes in order to make this possible:
- pass the runtime msg loop's feeder receive channel in to the context
  on the calling (portal opening) side such that a final 'return' msg
  can be waited upon using `Context.result()` which delivers the final
  return value from the callee side `@tractor.context` async function.
- always await a final result from the target context function in
  `Portal.open_context()`'s `__aexit__()` if the context has not
  been (requested to be) cancelled by client code on block exit.
- add an internal `Context._cancel_called` for context "cancel
  requested" tracking (much like `trio`'s cancel scope).
- allow flagging a stream as terminated using an internal
  `._eoc` flag which will mark the stream as stopped for iteration.
- drop `StopAsyncIteration` catching in `.receive()`; it does
  nothing.
2021-07-05 08:44:25 -04:00
Tyler Goodlet f8e2d4007c Specially raise a `ContextCancelled` for a task-context rpc 2021-07-05 08:44:25 -04:00
Tyler Goodlet 7069035f8b Expose streaming components at top level 2021-07-05 08:44:25 -04:00
Tyler Goodlet 79c8b75b5d Add a specially handled `ContextCancelled` error 2021-07-05 08:44:25 -04:00
Tyler Goodlet b3437dacbe Add a multi-task streaming test 2021-07-05 08:44:25 -04:00
Tyler Goodlet 910df139ad Avoid mutate on iterate race 2021-07-05 08:44:25 -04:00
Tyler Goodlet a4a6df5b5a Only close recv chan if we get a ref 2021-07-05 08:44:25 -04:00
Tyler Goodlet 732b9fe63b Add error case 2021-07-05 08:44:25 -04:00
Tyler Goodlet 8017e55b85 Support no arg to `Context.started()` like trio 2021-07-05 08:44:25 -04:00
Tyler Goodlet 20e73c5ce7 Fix up var naming and typing 2021-07-05 08:44:25 -04:00
Tyler Goodlet 18135b46f0 Only send stop msg if not received from far end 2021-07-05 08:44:25 -04:00
Tyler Goodlet bebe26ce4b Expose msg stream types at top level 2021-07-05 08:44:25 -04:00
Tyler Goodlet ddc6c85d60 Add dynamic pubsub test using new bidir stream apis 2021-07-05 08:44:25 -04:00
Tyler Goodlet be022b8e2b Use context for remote debugger locking
A context is the natural fit (vs. a receive stream) for locking the root
proc's tty usage via it's `.started()` sync point. Simplify the
`_breakpoin()` routine to be a simple async func instead of all this
"returning a coroutine" stuff from before we decided that
`tractor.breakpoint()` must be async. Use `runtime` level for locking
logging making it easier to trace.
2021-07-05 08:44:25 -04:00
Tyler Goodlet d59e9edec8 Be more pedantic with error handling 2021-07-05 08:44:25 -04:00
Tyler Goodlet 68d600d7ee Fix typing 2021-07-05 08:44:25 -04:00
Tyler Goodlet 3f1adc0b6f Parametrize with async for style tests 2021-07-05 08:44:25 -04:00
Tyler Goodlet ebf53157c0 Support passing `shield` at stream contruction 2021-07-05 08:44:25 -04:00
Tyler Goodlet 3625a8cd56 Add basic test set 2021-07-05 08:44:25 -04:00
Tyler Goodlet b706cd9d4d Cancel scope on stream consumer completion 2021-07-05 08:44:25 -04:00
Tyler Goodlet babe62a511 Expose `@context` decorator at top level 2021-07-05 08:44:25 -04:00
Tyler Goodlet 4371a0a898 Add initial bi-directional streaming
This mostly adds the api described in
https://github.com/goodboy/tractor/issues/53#issuecomment-806258798

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
2021-07-05 08:44:25 -04:00
25 changed files with 2646 additions and 461 deletions

View File

@ -322,6 +322,7 @@ channel`_!
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general .. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org .. _matrix channel: https://matrix.to/#/!tractor:matrix.org
@ -330,7 +331,7 @@ channel`_!
.. _messages: https://en.wikipedia.org/wiki/Message_passing .. _messages: https://en.wikipedia.org/wiki/Message_passing
.. _trio docs: https://trio.readthedocs.io/en/latest/ .. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _async generators: https://www.python.org/dev/peps/pep-0525/
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel .. _trio-parallel: https://github.com/richardsheridan/trio-parallel

View File

@ -0,0 +1,53 @@
'''
fast fail test with a context.
ensure the partially initialized sub-actor process
doesn't cause a hang on error/cancel of the parent
nrusery.
'''
import trio
import tractor
@tractor.context
async def sleep(
ctx: tractor.Context,
):
await trio.sleep(0.5)
await ctx.started()
await trio.sleep_forever()
async def open_ctx(
n: tractor._trionics.ActorNursery
):
# spawn both actors
portal = await n.start_actor(
name='sleeper',
enable_modules=[__name__],
)
async with portal.open_context(
sleep,
) as (ctx, first):
assert first is None
async def main():
async with tractor.open_nursery(
debug_mode=True,
loglevel='runtime',
) as an:
async with trio.open_nursery() as n:
n.start_soon(open_ctx, an)
await trio.sleep(0.2)
await trio.sleep(0.1)
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,31 @@
import trio
import tractor
async def key_error():
"Raise a ``NameError``"
return {}['doggy']
async def main():
"""Root dies
"""
async with tractor.open_nursery(
debug_mode=True,
loglevel='debug'
) as n:
# spawn both actors
portal = await n.run_in_actor(key_error)
# XXX: originally a bug causes by this
# where root would enter debugger even
# though child should have it locked.
with trio.fail_after(1):
await trio.Event().wait()
if __name__ == '__main__':
trio.run(main)

498
tests/test_2way.py 100644
View File

@ -0,0 +1,498 @@
"""
Bidirectional streaming and context API.
"""
import pytest
import trio
import tractor
from conftest import tractor_test
# the general stream semantics are
# - normal termination: far end relays a stop message which
# terminates an ongoing ``MsgStream`` iteration
# - cancel termination: context is cancelled on either side cancelling
# the "linked" inter-actor task context
_state: bool = False
@tractor.context
async def simple_setup_teardown(
ctx: tractor.Context,
data: int,
block_forever: bool = False,
) -> None:
# startup phase
global _state
_state = True
# signal to parent that we're up
await ctx.started(data + 1)
try:
if block_forever:
# block until cancelled
await trio.sleep_forever()
else:
return 'yo'
finally:
_state = False
async def assert_state(value: bool):
global _state
assert _state == value
@pytest.mark.parametrize(
'error_parent',
[False, True],
)
@pytest.mark.parametrize(
'callee_blocks_forever',
[False, True],
)
def test_simple_context(
error_parent,
callee_blocks_forever,
):
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'simple_context',
enable_modules=[__name__],
)
async with portal.open_context(
simple_setup_teardown,
data=10,
block_forever=callee_blocks_forever,
) as (ctx, sent):
assert sent == 11
if callee_blocks_forever:
await portal.run(assert_state, value=True)
await ctx.cancel()
else:
assert await ctx.result() == 'yo'
# after cancellation
await portal.run(assert_state, value=False)
if error_parent:
raise ValueError
# shut down daemon
await portal.cancel_actor()
if error_parent:
try:
trio.run(main)
except ValueError:
pass
else:
trio.run(main)
# basic stream terminations:
# - callee context closes without using stream
# - caller context closes without using stream
# - caller context calls `Context.cancel()` while streaming
# is ongoing resulting in callee being cancelled
# - callee calls `Context.cancel()` while streaming and caller
# sees stream terminated in `RemoteActorError`
# TODO: future possible features
# - restart request: far end raises `ContextRestart`
@tractor.context
async def close_ctx_immediately(
ctx: tractor.Context,
) -> None:
await ctx.started()
global _state
async with ctx.open_stream():
pass
@tractor_test
async def test_callee_closes_ctx_after_stream_open():
'callee context closes without using stream'
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'fast_stream_closer',
enable_modules=[__name__],
)
async with portal.open_context(
close_ctx_immediately,
# flag to avoid waiting the final result
# cancel_on_exit=True,
) as (ctx, sent):
assert sent is None
with trio.fail_after(0.5):
async with ctx.open_stream() as stream:
# should fall through since ``StopAsyncIteration``
# should be raised through translation of
# a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream:
assert 0
else:
# verify stream is now closed
try:
await stream.receive()
except trio.EndOfChannel:
pass
# TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of
# if/when we decide that's a good idea?)
try:
async with ctx.open_stream() as stream:
pass
except trio.ClosedResourceError:
pass
await portal.cancel_actor()
@tractor.context
async def expect_cancelled(
ctx: tractor.Context,
) -> None:
global _state
_state = True
await ctx.started()
try:
async with ctx.open_stream() as stream:
async for msg in stream:
await stream.send(msg) # echo server
except trio.Cancelled:
# expected case
_state = False
raise
else:
assert 0, "Wasn't cancelled!?"
@pytest.mark.parametrize(
'use_ctx_cancel_method',
[False, True],
)
@tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool,
):
'caller context closes without using stream'
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
async with portal.open_context(
expect_cancelled,
) as (ctx, sent):
await portal.run(assert_state, value=True)
assert sent is None
# call cancel explicitly
if use_ctx_cancel_method:
await ctx.cancel()
try:
async with ctx.open_stream() as stream:
async for msg in stream:
pass
except tractor.ContextCancelled:
raise # XXX: must be propagated to __aexit__
else:
assert 0, "Should have context cancelled?"
# channel should still be up
assert portal.channel.connected()
# ctx is closed here
await portal.run(assert_state, value=False)
else:
try:
with trio.fail_after(0.2):
await ctx.result()
assert 0, "Callee should have blocked!?"
except trio.TooSlowError:
await ctx.cancel()
try:
async with ctx.open_stream() as stream:
async for msg in stream:
pass
except tractor.ContextCancelled:
pass
else:
assert 0, "Should have received closed resource error?"
# ctx is closed here
await portal.run(assert_state, value=False)
# channel should not have been destroyed yet, only the
# inter-actor-task context
assert portal.channel.connected()
# teardown the actor
await portal.cancel_actor()
@tractor_test
async def test_multitask_caller_cancels_from_nonroot_task():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
async with portal.open_context(
expect_cancelled,
) as (ctx, sent):
await portal.run(assert_state, value=True)
assert sent is None
async with ctx.open_stream() as stream:
async def send_msg_then_cancel():
await stream.send('yo')
await portal.run(assert_state, value=True)
await ctx.cancel()
await portal.run(assert_state, value=False)
async with trio.open_nursery() as n:
n.start_soon(send_msg_then_cancel)
try:
async for msg in stream:
assert msg == 'yo'
except tractor.ContextCancelled:
raise # XXX: must be propagated to __aexit__
# channel should still be up
assert portal.channel.connected()
# ctx is closed here
await portal.run(assert_state, value=False)
# channel should not have been destroyed yet, only the
# inter-actor-task context
assert portal.channel.connected()
# teardown the actor
await portal.cancel_actor()
@tractor.context
async def cancel_self(
ctx: tractor.Context,
) -> None:
global _state
_state = True
await ctx.cancel()
try:
with trio.fail_after(0.1):
await trio.sleep_forever()
except trio.Cancelled:
raise
except trio.TooSlowError:
# should never get here
assert 0
@tractor_test
async def test_callee_cancels_before_started():
'''callee calls `Context.cancel()` while streaming and caller
sees stream terminated in `ContextCancelled`.
'''
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'cancels_self',
enable_modules=[__name__],
)
try:
async with portal.open_context(
cancel_self,
) as (ctx, sent):
async with ctx.open_stream():
await trio.sleep_forever()
# raises a special cancel signal
except tractor.ContextCancelled as ce:
ce.type == trio.Cancelled
# teardown the actor
await portal.cancel_actor()
@tractor.context
async def simple_rpc(
ctx: tractor.Context,
data: int,
) -> None:
"""Test a small ping-pong server.
"""
# signal to parent that we're up
await ctx.started(data + 1)
print('opening stream in callee')
async with ctx.open_stream() as stream:
count = 0
while True:
try:
await stream.receive() == 'ping'
except trio.EndOfChannel:
assert count == 10
break
else:
print('pong')
await stream.send('pong')
count += 1
@tractor.context
async def simple_rpc_with_forloop(
ctx: tractor.Context,
data: int,
) -> None:
"""Same as previous test but using ``async for`` syntax/api.
"""
# signal to parent that we're up
await ctx.started(data + 1)
print('opening stream in callee')
async with ctx.open_stream() as stream:
count = 0
async for msg in stream:
assert msg == 'ping'
print('pong')
await stream.send('pong')
count += 1
else:
assert count == 10
@pytest.mark.parametrize(
'use_async_for',
[True, False],
)
@pytest.mark.parametrize(
'server_func',
[simple_rpc, simple_rpc_with_forloop],
)
def test_simple_rpc(server_func, use_async_for):
"""The simplest request response pattern.
"""
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'rpc_server',
enable_modules=[__name__],
)
async with portal.open_context(
server_func, # taken from pytest parameterization
data=10,
) as (ctx, sent):
assert sent == 11
async with ctx.open_stream() as stream:
if use_async_for:
count = 0
# receive msgs using async for style
print('ping')
await stream.send('ping')
async for msg in stream:
assert msg == 'pong'
print('ping')
await stream.send('ping')
count += 1
if count >= 9:
break
else:
# classic send/receive style
for _ in range(10):
print('ping')
await stream.send('ping')
assert await stream.receive() == 'pong'
# stream should terminate here
# final context result(s) should be consumed here in __aexit__()
await portal.cancel_actor()
trio.run(main)

View File

@ -0,0 +1,220 @@
"""
Advanced streaming patterns using bidirectional streams and contexts.
"""
import itertools
from typing import Set, Dict, List
import trio
import tractor
_registry: Dict[str, Set[tractor.ReceiveMsgStream]] = {
'even': set(),
'odd': set(),
}
async def publisher(
seed: int = 0,
) -> None:
global _registry
def is_even(i):
return i % 2 == 0
for val in itertools.count(seed):
sub = 'even' if is_even(val) else 'odd'
for sub_stream in _registry[sub].copy():
await sub_stream.send(val)
# throttle send rate to ~1kHz
# making it readable to a human user
await trio.sleep(1/1000)
@tractor.context
async def subscribe(
ctx: tractor.Context,
) -> None:
global _registry
# syn caller
await ctx.started(None)
async with ctx.open_stream() as stream:
# update subs list as consumer requests
async for new_subs in stream:
new_subs = set(new_subs)
remove = new_subs - _registry.keys()
print(f'setting sub to {new_subs} for {ctx.chan.uid}')
# remove old subs
for sub in remove:
_registry[sub].remove(stream)
# add new subs for consumer
for sub in new_subs:
_registry[sub].add(stream)
async def consumer(
subs: List[str],
) -> None:
uid = tractor.current_actor().uid
async with tractor.wait_for_actor('publisher') as portal:
async with portal.open_context(subscribe) as (ctx, first):
async with ctx.open_stream() as stream:
# flip between the provided subs dynamically
if len(subs) > 1:
for sub in itertools.cycle(subs):
print(f'setting dynamic sub to {sub}')
await stream.send([sub])
count = 0
async for value in stream:
print(f'{uid} got: {value}')
if count > 5:
break
count += 1
else: # static sub
await stream.send(subs)
async for value in stream:
print(f'{uid} got: {value}')
def test_dynamic_pub_sub():
global _registry
from multiprocessing import cpu_count
cpus = cpu_count()
async def main():
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
await n.run_in_actor(publisher)
for i, sub in zip(
range(cpus - 2),
itertools.cycle(_registry.keys())
):
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
)
# make one dynamic subscriber
await n.run_in_actor(
consumer,
name='consumer_dynamic',
subs=list(_registry.keys()),
)
# block until cancelled by user
with trio.fail_after(3):
await trio.sleep_forever()
try:
trio.run(main)
except trio.TooSlowError:
pass
@tractor.context
async def one_task_streams_and_one_handles_reqresp(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async def pingpong():
'''Run a simple req/response service.
'''
async for msg in stream:
print('rpc server ping')
assert msg == 'ping'
print('rpc server pong')
await stream.send('pong')
async with trio.open_nursery() as n:
n.start_soon(pingpong)
for _ in itertools.count():
await stream.send('yo')
await trio.sleep(0.01)
def test_reqresp_ontopof_streaming():
'''Test a subactor that both streams with one task and
spawns another which handles a small requests-response
dialogue over the same bidir-stream.
'''
async def main():
with trio.move_on_after(2):
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
portal = await n.start_actor(
'dual_tasks',
enable_modules=[__name__]
)
# flat to make sure we get at least one pong
got_pong: bool = False
async with portal.open_context(
one_task_streams_and_one_handles_reqresp,
) as (ctx, first):
assert first is None
async with ctx.open_stream() as stream:
await stream.send('ping')
async for msg in stream:
print(f'client received: {msg}')
assert msg in {'pong', 'yo'}
if msg == 'pong':
got_pong = True
await stream.send('ping')
print('client sent ping')
assert got_pong
try:
trio.run(main)
except trio.TooSlowError:
pass

View File

@ -309,32 +309,58 @@ def test_multi_daemon_subactors(spawn, loglevel):
next_msg = name_error_msg next_msg = name_error_msg
elif name_error_msg in before: elif name_error_msg in before:
next_msg = None next_msg = bp_forever_msg
else: else:
raise ValueError("Neither log msg was found !?") raise ValueError("Neither log msg was found !?")
child.sendline('c') # NOTE: previously since we did not have clobber prevention
# in the root actor this final resume could result in the debugger
# tearing down since both child actors would be cancelled and it was
# unlikely that `bp_forever` would re-acquire the tty loack again.
# Now, we should have a final resumption in the root plus a possible
# second entry by `bp_forever`.
# first name_error failure child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
if next_msg: assert next_msg in before
assert next_msg in before
child.sendline('c') # XXX: hoorayy the root clobering the child here was fixed!
# IMO, this demonstrates the true power of SC system design.
child.expect(r"\(Pdb\+\+\)") # now the root actor won't clobber the bp_forever child
before = str(child.before.decode()) # during it's first access to the debug lock, but will instead
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before # wait for the lock to release, by the edge triggered
# ``_debug._no_remote_has_tty`` event before sending cancel messages
# (via portals) to its underlings B)
# at some point here there should have been some warning msg from
# the root announcing it avoided a clobber of the child's lock, but
# it seems unreliable in testing here to gnab it:
# assert "in use by child ('bp_forever'," in before
# wait for final error in root
while True:
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
try:
# root error should be packed as remote error
assert "_exceptions.RemoteActorError: ('name_error'" in before
break
except AssertionError:
assert bp_forever_msg in before
try: try:
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
except pexpect.exceptions.TIMEOUT: except pexpect.exceptions.TIMEOUT:
# Failed to exit using continue..? # Failed to exit using continue..?
child.sendline('q') child.sendline('q')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
@ -389,7 +415,7 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
child = spawn('multi_nested_subactors_error_up_through_nurseries') child = spawn('multi_nested_subactors_error_up_through_nurseries')
# startup time can be iffy # startup time can be iffy
time.sleep(1) # time.sleep(1)
for i in range(12): for i in range(12):
try: try:
@ -471,3 +497,21 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
assert "NameError: name 'doggypants' is not defined" in before assert "NameError: name 'doggypants' is not defined" in before
def test_root_cancels_child_context_during_startup(
spawn,
):
'''Verify a fast fail in the root doesn't lock up the child reaping
and all while using the new context api.
'''
child = spawn('fast_error_in_root_after_spawn')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "AssertionError" in before
child.sendline('c')
child.expect(pexpect.EOF)

View File

@ -0,0 +1,24 @@
import asyncio
import pytest
import tractor
async def sleep_and_err():
await asyncio.sleep(0.1)
assert 0
async def asyncio_actor():
assert tractor.current_actor().is_infected_aio()
await tractor.to_asyncio.run_task(sleep_and_err)
def test_infected_simple_error(arb_addr):
async def main():
async with tractor.open_nursery() as n:
await n.run_in_actor(asyncio_actor, infected_asyncio=True)
with pytest.raises(tractor.RemoteActorError) as excinfo:
tractor.run(main, arbiter_addr=arb_addr)

View File

@ -32,13 +32,16 @@ async def async_gen_stream(sequence):
# block indefinitely waiting to be cancelled by ``aclose()`` call # block indefinitely waiting to be cancelled by ``aclose()`` call
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
await trio.sleep(float('inf')) await trio.sleep_forever()
assert 0 assert 0
assert cs.cancelled_caught assert cs.cancelled_caught
@tractor.stream @tractor.stream
async def context_stream(ctx, sequence): async def context_stream(
ctx: tractor.Context,
sequence
):
for i in sequence: for i in sequence:
await ctx.send_yield(i) await ctx.send_yield(i)
await trio.sleep(0.1) await trio.sleep(0.1)
@ -338,6 +341,8 @@ async def test_respawn_consumer_task(
print("all values streamed, BREAKING") print("all values streamed, BREAKING")
break break
cs.cancel()
# TODO: this is justification for a # TODO: this is justification for a
# ``ActorNursery.stream_from_actor()`` helper? # ``ActorNursery.stream_from_actor()`` helper?
await portal.cancel_actor() await portal.cancel_actor()

View File

@ -5,14 +5,25 @@ tractor: An actor model micro-framework built on
from trio import MultiError from trio import MultiError
from ._ipc import Channel from ._ipc import Channel
from ._streaming import Context, stream from ._streaming import (
Context,
ReceiveMsgStream,
MsgStream,
stream,
context,
)
from ._discovery import get_arbiter, find_actor, wait_for_actor from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor, is_root_process from ._state import current_actor, is_root_process
from ._exceptions import RemoteActorError, ModuleNotExposed from ._exceptions import (
RemoteActorError,
ModuleNotExposed,
ContextCancelled,
)
from ._debug import breakpoint, post_mortem from ._debug import breakpoint, post_mortem
from . import msg from . import msg
from ._root import run, run_daemon, open_root_actor from ._root import run, run_daemon, open_root_actor
from ._portal import Portal
__all__ = [ __all__ = [
@ -21,6 +32,7 @@ __all__ = [
'ModuleNotExposed', 'ModuleNotExposed',
'MultiError', 'MultiError',
'RemoteActorError', 'RemoteActorError',
'ContextCancelled',
'breakpoint', 'breakpoint',
'current_actor', 'current_actor',
'find_actor', 'find_actor',
@ -29,11 +41,14 @@ __all__ = [
'msg', 'msg',
'open_nursery', 'open_nursery',
'open_root_actor', 'open_root_actor',
'Portal',
'post_mortem', 'post_mortem',
'run', 'run',
'run_daemon', 'run_daemon',
'stream', 'stream',
'wait_for_actor', 'context',
'ReceiveMsgStream',
'MsgStream',
'to_asyncio', 'to_asyncio',
'wait_for_actor', 'wait_for_actor',
] ]

View File

@ -14,6 +14,7 @@ from types import ModuleType
import sys import sys
import os import os
from contextlib import ExitStack from contextlib import ExitStack
import warnings
import trio # type: ignore import trio # type: ignore
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -27,6 +28,7 @@ from ._exceptions import (
unpack_error, unpack_error,
ModuleNotExposed, ModuleNotExposed,
is_multi_cancelled, is_multi_cancelled,
ContextCancelled,
TransportClosed, TransportClosed,
) )
from . import _debug from . import _debug
@ -44,6 +46,7 @@ class ActorFailure(Exception):
async def _invoke( async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
chan: Channel, chan: Channel,
@ -56,15 +59,44 @@ async def _invoke(
"""Invoke local func and deliver result(s) over provided channel. """Invoke local func and deliver result(s) over provided channel.
""" """
treat_as_gen = False treat_as_gen = False
cs = None
# possible a traceback (not sure what typing is for this..)
tb = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, cancel_scope) cs: trio.CancelScope = None
ctx = Context(chan, cid)
context: bool = False
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func)
params = sig.parameters
# compat with old api
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
if 'ctx' in params:
warnings.warn(
"`@tractor.stream decorated funcs should now declare "
"a `stream` arg, `ctx` is now designated for use with "
"@tractor.context",
DeprecationWarning,
stacklevel=2,
)
elif 'stream' in params:
assert 'stream' in params
kwargs['stream'] = ctx
treat_as_gen = True treat_as_gen = True
elif getattr(func, '_tractor_context_function', False):
# handle decorated ``@tractor.context`` async function
kwargs['ctx'] = ctx
context = True
# errors raised inside this block are propgated back to caller # errors raised inside this block are propgated back to caller
try: try:
if not ( if not (
@ -102,52 +134,106 @@ async def _invoke(
# `StopAsyncIteration` system here for returning a final # `StopAsyncIteration` system here for returning a final
# value if desired # value if desired
await chan.send({'stop': True, 'cid': cid}) await chan.send({'stop': True, 'cid': cid})
else:
if treat_as_gen: # one way @stream func that gets treated like an async gen
await chan.send({'functype': 'asyncgen', 'cid': cid}) elif treat_as_gen:
# XXX: the async-func may spawn further tasks which push await chan.send({'functype': 'asyncgen', 'cid': cid})
# back values like an async-generator would but must # XXX: the async-func may spawn further tasks which push
# manualy construct the response dict-packet-responses as # back values like an async-generator would but must
# above # manualy construct the response dict-packet-responses as
with cancel_scope as cs: # above
task_status.started(cs) with cancel_scope as cs:
await coro task_status.started(cs)
if not cs.cancelled_caught: await coro
# task was not cancelled so we can instruct the
# far end async gen to tear down if not cs.cancelled_caught:
await chan.send({'stop': True, 'cid': cid}) # task was not cancelled so we can instruct the
else: # far end async gen to tear down
# regular async function await chan.send({'stop': True, 'cid': cid})
await chan.send({'functype': 'asyncfunc', 'cid': cid})
with cancel_scope as cs: elif context:
task_status.started(cs) # context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})
async with trio.open_nursery() as scope_nursery:
ctx._scope_nursery = scope_nursery
cs = scope_nursery.cancel_scope
task_status.started(cs)
try:
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err:
tb = err.__traceback__
if cs.cancelled_caught:
# TODO: pack in ``trio.Cancelled.__traceback__`` here
# so they can be unwrapped and displayed on the caller
# side!
fname = func.__name__
if ctx._cancel_called:
msg = f'{fname} cancelled itself'
elif cs.cancel_called:
msg = (
f'{fname} was remotely cancelled by its caller '
f'{ctx.chan.uid}'
)
# task-contex was cancelled so relay to the cancel to caller
raise ContextCancelled(
msg,
suberror_type=trio.Cancelled,
)
else:
# regular async function
await chan.send({'functype': 'asyncfunc', 'cid': cid})
with cancel_scope as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# TODO: maybe we'll want differnet "levels" of debugging if not is_multi_cancelled(err):
# eventualy such as ('app', 'supervisory', 'runtime') ?
if not isinstance(err, trio.ClosedResourceError) and ( log.exception("Actor crashed:")
not is_multi_cancelled(err)
): # TODO: maybe we'll want different "levels" of debugging
# XXX: is there any case where we'll want to debug IPC # eventualy such as ('app', 'supervisory', 'runtime') ?
# disconnects? I can't think of a reason that inspecting
# this type of failure will be useful for respawns or # if not isinstance(err, trio.ClosedResourceError) and (
# recovery logic - the only case is some kind of strange bug # if not is_multi_cancelled(err) and (
# in `trio` itself?
entered = await _debug._maybe_enter_pm(err) entered_debug: bool = False
if not entered: if not isinstance(err, ContextCancelled) or (
isinstance(err, ContextCancelled) and ctx._cancel_called
):
# XXX: is there any case where we'll want to debug IPC
# disconnects as a default?
#
# I can't think of a reason that inspecting
# this type of failure will be useful for respawns or
# recovery logic - the only case is some kind of strange bug
# in our transport layer itself? Going to keep this
# open ended for now.
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
log.exception("Actor crashed:") log.exception("Actor crashed:")
# always ship errors back to caller # always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err, tb=tb)
err_msg['cid'] = cid err_msg['cid'] = cid
try: try:
await chan.send(err_msg) await chan.send(err_msg)
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.warning( # if we can't propagate the error that's a big boo boo
f"Failed to ship error to caller @ {chan.uid}") log.error(
f"Failed to ship error to caller @ {chan.uid} !?"
)
if cs is None: if cs is None:
# error is from above code not from rpc invocation # error is from above code not from rpc invocation
@ -165,7 +251,7 @@ async def _invoke(
f"Task {func} likely errored or cancelled before it started") f"Task {func} likely errored or cancelled before it started")
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.info("All RPC tasks have completed") log.runtime("All RPC tasks have completed")
actor._ongoing_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
@ -180,10 +266,10 @@ _lifetime_stack: ExitStack = ExitStack()
class Actor: class Actor:
"""The fundamental concurrency primitive. """The fundamental concurrency primitive.
An *actor* is the combination of a regular Python or An *actor* is the combination of a regular Python process
``multiprocessing.Process`` executing a ``trio`` task tree, communicating executing a ``trio`` task tree, communicating
with other actors through "portals" which provide a native async API with other actors through "portals" which provide a native async API
around "channels". around various IPC transport "channels".
""" """
is_arbiter: bool = False is_arbiter: bool = False
@ -196,6 +282,9 @@ class Actor:
_parent_main_data: Dict[str, str] _parent_main_data: Dict[str, str]
_parent_chan_cs: Optional[trio.CancelScope] = None _parent_chan_cs: Optional[trio.CancelScope] = None
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
def __init__( def __init__(
self, self,
name: str, name: str,
@ -327,14 +416,18 @@ class Actor:
raise mne raise mne
async def _stream_handler( async def _stream_handler(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
) -> None: ) -> None:
"""Entry point for new inbound connections to the channel server. """Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New connection to us {chan}") log.runtime(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
try: try:
@ -365,11 +458,16 @@ class Actor:
event.set() event.set()
chans = self._peers[uid] chans = self._peers[uid]
# TODO: re-use channels for new connections instead
# of always new ones; will require changing all the
# discovery funcs
if chans: if chans:
log.warning( log.runtime(
f"already have channel(s) for {uid}:{chans}?" f"already have channel(s) for {uid}:{chans}?"
) )
log.trace(f"Registered {chan} for {uid}") # type: ignore
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
@ -378,10 +476,24 @@ class Actor:
try: try:
await self._process_messages(chan) await self._process_messages(chan)
finally: finally:
# channel cleanup sequence
# for (channel, cid) in self._rpc_tasks.copy():
# if channel is chan:
# with trio.CancelScope(shield=True):
# await self._cancel_task(cid, channel)
# # close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
log.debug(f"Releasing channel {chan} from {chan.uid}") log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
if not chans: if not chans:
log.debug(f"No more channels for {chan.uid}") log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(chan.uid, None)
@ -394,14 +506,22 @@ class Actor:
# # XXX: is this necessary (GC should do it?) # # XXX: is this necessary (GC should do it?)
if chan.connected(): if chan.connected():
# if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to
# an error and so we should at least try to terminate
# the channel from this end gracefully.
log.debug(f"Disconnecting channel {chan}") log.debug(f"Disconnecting channel {chan}")
try: try:
# send our msg loop terminate sentinel # send a msg loop terminate sentinel
await chan.send(None) await chan.send(None)
# XXX: do we want this?
# causes "[104] connection reset by peer" on other end
# await chan.aclose() # await chan.aclose()
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.exception( log.warning(f"Channel for {chan.uid} was already closed")
f"Channel for {chan.uid} was already zonked..")
async def _push_result( async def _push_result(
self, self,
@ -411,22 +531,32 @@ class Actor:
) -> None: ) -> None:
"""Push an RPC result to the local consumer's queue. """Push an RPC result to the local consumer's queue.
""" """
actorid = chan.uid # actorid = chan.uid
assert actorid, f"`actorid` can't be {actorid}" assert chan.uid, f"`chan.uid` can't be {chan.uid}"
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
assert send_chan.cid == cid # type: ignore assert send_chan.cid == cid # type: ignore
if 'stop' in msg: if 'error' in msg:
log.debug(f"{send_chan} was terminated at remote end") ctx = getattr(recv_chan, '_ctx', None)
# indicate to consumer that far end has stopped # if ctx:
return await send_chan.aclose() # ctx._error_from_remote_msg(msg)
# log.debug(f"{send_chan} was terminated at remote end")
# # indicate to consumer that far end has stopped
# return await send_chan.aclose()
try: try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}") log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}")
# maintain backpressure # maintain backpressure
await send_chan.send(msg) await send_chan.send(msg)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the
# local task has already sent a 'stop' / StopAsyncInteration
# to the other side but and possibly has closed the local
# feeder mem chan? Do we wait for some kind of ack or just
# let this fail silently and bubble up (currently)?
# XXX: local consumer has closed their side # XXX: local consumer has closed their side
# so cancel the far end streaming task # so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed") log.warning(f"{send_chan} consumer is already closed")
@ -435,12 +565,14 @@ class Actor:
self, self,
actorid: Tuple[str, str], actorid: Tuple[str, str],
cid: str cid: str
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
log.debug(f"Getting result queue for {actorid} cid {cid}") log.debug(f"Getting result queue for {actorid} cid {cid}")
try: try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
except KeyError: except KeyError:
send_chan, recv_chan = trio.open_memory_channel(1000) send_chan, recv_chan = trio.open_memory_channel(2*10)
send_chan.cid = cid # type: ignore send_chan.cid = cid # type: ignore
recv_chan.cid = cid # type: ignore recv_chan.cid = cid # type: ignore
self._cids2qs[(actorid, cid)] = send_chan, recv_chan self._cids2qs[(actorid, cid)] = send_chan, recv_chan
@ -489,23 +621,33 @@ class Actor:
task_status.started(loop_cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.debug( log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}") f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks:
for (channel, cid) in self._rpc_tasks.copy():
if channel is chan: if channel is chan:
await self._cancel_task(cid, channel) await self._cancel_task(cid, channel)
# close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
log.debug( log.debug(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")
break break
log.trace( # type: ignore log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}") f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid') cid = msg.get('cid')
if cid: if cid:
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._push_result(chan, cid, msg) await self._push_result(chan, cid, msg)
log.debug( log.debug(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
continue continue
@ -566,7 +708,7 @@ class Actor:
else: else:
# mark that we have ongoing rpc tasks # mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
log.info(f"RPC func is {func}") log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be # store cancel scope such that the rpc task can be
# cancelled gracefully if requested # cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = ( self._rpc_tasks[(chan, cid)] = (
@ -575,7 +717,7 @@ class Actor:
# self.cancel() was called so kill this msg loop # self.cancel() was called so kill this msg loop
# and break out into ``_async_main()`` # and break out into ``_async_main()``
log.warning( log.warning(
f"{self.uid} was remotely cancelled; " f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..") "waiting on cancellation completion..")
await self._cancel_complete.wait() await self._cancel_complete.wait()
loop_cs.cancel() loop_cs.cancel()
@ -1043,9 +1185,12 @@ class Actor:
raise ValueError(f"{uid} is not a valid uid?!") raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete") log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid return uid
def is_infected_aio(self) -> bool:
return self._infected_aio
class Arbiter(Actor): class Arbiter(Actor):
"""A special actor who knows all the other actors and always has """A special actor who knows all the other actors and always has

View File

@ -19,12 +19,15 @@ def parse_ipaddr(arg):
return (str(host), int(port)) return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid) parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str) parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr) parser.add_argument("--parent_addr", type=parse_ipaddr)
parser.add_argument("--asyncio", action='store_true')
args = parser.parse_args() args = parser.parse_args()
subactor = Actor( subactor = Actor(
@ -36,5 +39,6 @@ if __name__ == "__main__":
_trio_main( _trio_main(
subactor, subactor,
parent_addr=args.parent_addr parent_addr=args.parent_addr,
) infect_asyncio=args.asyncio,
)

View File

@ -1,13 +1,13 @@
""" """
Multi-core debugging for da peeps! Multi-core debugging for da peeps!
""" """
import bdb import bdb
import sys import sys
from functools import partial from functools import partial
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator from typing import Tuple, Optional, Callable, AsyncIterator
from async_generator import aclosing
import tractor import tractor
import trio import trio
@ -31,14 +31,22 @@ log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem'] __all__ = ['breakpoint', 'post_mortem']
# TODO: wrap all these in a static global class: ``DebugLock`` maybe?
# placeholder for function to set a ``trio.Event`` on debugger exit # placeholder for function to set a ``trio.Event`` on debugger exit
_pdb_release_hook: Optional[Callable] = None _pdb_release_hook: Optional[Callable] = None
# actor-wide variable pointing to current task name using debugger # actor-wide variable pointing to current task name using debugger
_in_debug = False _local_task_in_debug: Optional[str] = None
# actor tree-wide actor uid that supposedly has the tty lock
_global_actor_in_debug: Optional[Tuple[str, str]] = None
# lock in root actor preventing multi-access to local tty # lock in root actor preventing multi-access to local tty
_debug_lock = trio.StrictFIFOLock() _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_local_pdb_complete: Optional[trio.Event] = None
_no_remote_has_tty: Optional[trio.Event] = None
# XXX: set by the current task waiting on the root tty lock # XXX: set by the current task waiting on the root tty lock
# and must be cancelled if this actor is cancelled via message # and must be cancelled if this actor is cancelled via message
@ -61,19 +69,19 @@ class PdbwTeardown(pdbpp.Pdb):
# TODO: figure out how to dissallow recursive .set_trace() entry # TODO: figure out how to dissallow recursive .set_trace() entry
# since that'll cause deadlock for us. # since that'll cause deadlock for us.
def set_continue(self): def set_continue(self):
global _in_debug
try: try:
super().set_continue() super().set_continue()
finally: finally:
_in_debug = False global _local_task_in_debug
_local_task_in_debug = None
_pdb_release_hook() _pdb_release_hook()
def set_quit(self): def set_quit(self):
global _in_debug
try: try:
super().set_quit() super().set_quit()
finally: finally:
_in_debug = False global _local_task_in_debug
_local_task_in_debug = None
_pdb_release_hook() _pdb_release_hook()
@ -102,7 +110,7 @@ class PdbwTeardown(pdbpp.Pdb):
# async with aclosing(async_stdin): # async with aclosing(async_stdin):
# async for msg in async_stdin: # async for msg in async_stdin:
# log.trace(f"Stdin input:\n{msg}") # log.runtime(f"Stdin input:\n{msg}")
# # encode to bytes # # encode to bytes
# bmsg = str.encode(msg) # bmsg = str.encode(msg)
@ -116,20 +124,71 @@ class PdbwTeardown(pdbpp.Pdb):
@asynccontextmanager @asynccontextmanager
async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
"""Acquire a actor local FIFO lock meant to mutex entry to a local '''Acquire a actor local FIFO lock meant to mutex entry to a local
debugger entry point to avoid tty clobbering by multiple processes. debugger entry point to avoid tty clobbering a global root process.
"""
'''
global _debug_lock, _global_actor_in_debug, _no_remote_has_tty
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
log.pdb(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
)
we_acquired = False
if _no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
_no_remote_has_tty = trio.Event()
try: try:
log.debug( log.debug(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}") f"entering lock checkpoint, remote task: {task_name}:{uid}"
)
we_acquired = True
await _debug_lock.acquire() await _debug_lock.acquire()
# we_acquired = True
_global_actor_in_debug = uid
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
yield
# NOTE: critical section!
# this yield is unshielded.
# IF we received a cancel during the shielded lock
# entry of some next-in-queue requesting task,
# then the resumption here will result in that
# Cancelled being raised to our caller below!
# in this case the finally below should trigger
# and the surrounding calle side context should cancel
# normally relaying back to the caller.
yield _debug_lock
finally: finally:
_debug_lock.release() # if _global_actor_in_debug == uid:
if we_acquired and _debug_lock.locked():
_debug_lock.release()
# IFF there are no more requesting tasks queued up fire, the
# "tty-unlocked" event thereby alerting any monitors of the lock that
# we are now back in the "tty unlocked" state. This is basically
# and edge triggered signal around an empty queue of sub-actor
# tasks that may have tried to acquire the lock.
stats = _debug_lock.statistics()
if (
not stats.owner
):
log.pdb(f"No more tasks waiting on tty lock! says {uid}")
_no_remote_has_tty.set()
_no_remote_has_tty = None
_global_actor_in_debug = None
log.debug(f"TTY lock released, remote task: {task_name}:{uid}") log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
@ -144,118 +203,213 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
# signal.signal(signal.SIGINT, prior_handler) # signal.signal(signal.SIGINT, prior_handler)
@tractor.context
async def _hijack_stdin_relay_to_child( async def _hijack_stdin_relay_to_child(
ctx: tractor.Context,
subactor_uid: Tuple[str, str] subactor_uid: Tuple[str, str]
) -> AsyncIterator[str]:
) -> str:
'''Hijack the tty in the root process of an actor tree such that
the pdbpp debugger console can be allocated to a sub-actor for repl
bossing.
'''
task_name = trio.lowlevel.current_task().name
# TODO: when we get to true remote debugging # TODO: when we get to true remote debugging
# this will deliver stdin data # this will deliver stdin data?
log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
async with _acquire_debug_lock(subactor_uid):
log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# with _disable_sigint(): log.debug(
"Attempting to acquire TTY lock\n"
f"remote task: {task_name}:{subactor_uid}"
)
# indicate to child that we've locked stdio log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
yield 'Locked'
# wait for cancellation of stream by child with trio.CancelScope(shield=True):
# indicating debugger is dis-engaged
await trio.sleep_forever()
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock") async with _acquire_debug_lock(subactor_uid):
# indicate to child that we've locked stdio
await ctx.started('Locked')
log.pdb( # type: ignore
f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# wait for unlock pdb by child
async with ctx.open_stream() as stream:
try:
assert await stream.receive() == 'pdb_unlock'
except trio.BrokenResourceError:
# XXX: there may be a race with the portal teardown
# with the calling actor which we can safely ignore
# the alternative would be sending an ack message
# and allowing the client to wait for us to teardown
# first?
pass
log.debug(
f"TTY lock released, remote task: {task_name}:{subactor_uid}")
return "pdb_unlock_complete"
# XXX: We only make this sync in case someone wants to async def _breakpoint(
# overload the ``breakpoint()`` built-in.
def _breakpoint(debug_func) -> Awaitable[None]: debug_func,
"""``tractor`` breakpoint entry for engaging pdb machinery
in subactors. # TODO:
""" # shield: bool = False
) -> None:
'''``tractor`` breakpoint entry for engaging pdb machinery
in the root or a subactor.
'''
# TODO: is it possible to debug a trio.Cancelled except block?
# right now it seems like we can kinda do with by shielding
# around ``tractor.breakpoint()`` but not if we move the shielded
# scope here???
# with trio.CancelScope(shield=shield):
actor = tractor.current_actor() actor = tractor.current_actor()
do_unlock = trio.Event() task_name = trio.lowlevel.current_task().name
global _local_pdb_complete, _pdb_release_hook
global _local_task_in_debug, _global_actor_in_debug
await trio.lowlevel.checkpoint()
async def wait_for_parent_stdin_hijack( async def wait_for_parent_stdin_hijack(
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
global _debugger_request_cs global _debugger_request_cs
with trio.CancelScope() as cs:
with trio.CancelScope(shield=True) as cs:
_debugger_request_cs = cs _debugger_request_cs = cs
try: try:
async with get_root() as portal: async with get_root() as portal:
async with portal.open_stream_from(
tractor._debug._hijack_stdin_relay_to_child,
subactor_uid=actor.uid,
) as stream:
# block until first yield above log.error('got portal')
async for val in stream:
assert val == 'Locked' # this syncs to child's ``Context.started()`` call.
task_status.started() async with portal.open_context(
# with trio.CancelScope(shield=True): tractor._debug._hijack_stdin_relay_to_child,
await do_unlock.wait() subactor_uid=actor.uid,
) as (ctx, val):
log.error('locked context')
assert val == 'Locked'
async with ctx.open_stream() as stream:
log.error('opened stream')
# unblock local caller
task_status.started()
try:
await _local_pdb_complete.wait()
finally:
# TODO: shielding currently can cause hangs...
with trio.CancelScope(shield=True):
await stream.send('pdb_unlock')
# sync with callee termination
assert await ctx.result() == "pdb_unlock_complete"
except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock')
# trigger cancellation of remote stream
break
finally: finally:
log.debug(f"Exiting debugger for actor {actor}") log.debug(f"Exiting debugger for actor {actor}")
global _in_debug global _local_task_in_debug
_in_debug = False _local_task_in_debug = None
log.debug(f"Child {actor} released parent stdio lock") log.debug(f"Child {actor} released parent stdio lock")
async def _bp(): if not _local_pdb_complete or _local_pdb_complete.is_set():
"""Async breakpoint which schedules a parent stdio lock, and once complete _local_pdb_complete = trio.Event()
enters the ``pdbpp`` debugging console.
"""
task_name = trio.lowlevel.current_task().name
global _in_debug # TODO: need a more robust check for the "root" actor
if actor._parent_chan and not is_root_process():
# TODO: need a more robust check for the "root" actor if _local_task_in_debug:
if actor._parent_chan and not is_root_process(): if _local_task_in_debug == task_name:
if _in_debug: # this task already has the lock and is
if _in_debug == task_name: # likely recurrently entering a breakpoint
# this task already has the lock and is return
# likely recurrently entering a breakpoint
return
# if **this** actor is already in debug mode block here # if **this** actor is already in debug mode block here
# waiting for the control to be released - this allows # waiting for the control to be released - this allows
# support for recursive entries to `tractor.breakpoint()` # support for recursive entries to `tractor.breakpoint()`
log.warning( log.warning(f"{actor.uid} already has a debug lock, waiting...")
f"Actor {actor.uid} already has a debug lock, waiting...")
await do_unlock.wait()
await trio.sleep(0.1)
# assign unlock callback for debugger teardown hooks await _local_pdb_complete.wait()
global _pdb_release_hook await trio.sleep(0.1)
_pdb_release_hook = do_unlock.set
# mark local actor as "in debug mode" to avoid recurrent # mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process # entries/requests to the root process
_in_debug = task_name _local_task_in_debug = task_name
# this **must** be awaited by the caller and is done using the # assign unlock callback for debugger teardown hooks
# root nursery so that the debugger can continue to run without _pdb_release_hook = _local_pdb_complete.set
# being restricted by the scope of a new task nursery.
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
# NOTE: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below?
# actor._service_n.cancel_scope.shield = shield
with trio.CancelScope(shield=True):
await actor._service_n.start(wait_for_parent_stdin_hijack) await actor._service_n.start(wait_for_parent_stdin_hijack)
elif is_root_process(): elif is_root_process():
# we also wait in the root-parent for any child that
# may have the tty locked prior
if _debug_lock.locked(): # root process already has it; ignore
return
await _debug_lock.acquire()
_pdb_release_hook = _debug_lock.release
# block here one (at the appropriate frame *up* where # we also wait in the root-parent for any child that
# ``breakpoint()`` was awaited and begin handling stdio # may have the tty locked prior
log.debug("Entering the synchronous world of pdb") global _debug_lock
debug_func(actor)
# user code **must** await this! # TODO: wait, what about multiple root tasks acquiring
return _bp() # it though.. shrug?
# root process (us) already has it; ignore
if _global_actor_in_debug == actor.uid:
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
if _debug_lock.locked():
log.warning(
'Root actor attempting to acquire active tty lock'
f' owned by {_global_actor_in_debug}')
await _debug_lock.acquire()
_global_actor_in_debug = actor.uid
_local_task_in_debug = task_name
# the lock must be released on pdb completion
def teardown():
global _local_pdb_complete, _debug_lock
global _global_actor_in_debug, _local_task_in_debug
_debug_lock.release()
_global_actor_in_debug = None
_local_task_in_debug = None
_local_pdb_complete.set()
_pdb_release_hook = teardown
# block here one (at the appropriate frame *up* where
# ``breakpoint()`` was awaited and begin handling stdio
log.debug("Entering the synchronous world of pdb")
debug_func(actor)
def _mk_pdb(): def _mk_pdb():
@ -276,7 +430,7 @@ def _set_trace(actor=None):
pdb = _mk_pdb() pdb = _mk_pdb()
if actor is not None: if actor is not None:
log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") # type: ignore
pdb.set_trace( pdb.set_trace(
# start 2 levels up in user code # start 2 levels up in user code
@ -285,8 +439,8 @@ def _set_trace(actor=None):
else: else:
# we entered the global ``breakpoint()`` built-in from sync code # we entered the global ``breakpoint()`` built-in from sync code
global _in_debug, _pdb_release_hook global _local_task_in_debug, _pdb_release_hook
_in_debug = 'sync' _local_task_in_debug = 'sync'
def nuttin(): def nuttin():
pass pass
@ -306,7 +460,7 @@ breakpoint = partial(
def _post_mortem(actor): def _post_mortem(actor):
log.runtime(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
pdb = _mk_pdb() pdb = _mk_pdb()
# custom Pdb post-mortem entry # custom Pdb post-mortem entry

View File

@ -16,12 +16,14 @@ from ._state import current_actor, _runtime_vars
@asynccontextmanager @asynccontextmanager
async def get_arbiter( async def get_arbiter(
host: str, host: str,
port: int, port: int,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
"""Return a portal instance connected to a local or remote '''Return a portal instance connected to a local or remote
arbiter. arbiter.
""" '''
actor = current_actor() actor = current_actor()
if not actor: if not actor:
@ -33,16 +35,20 @@ async def get_arbiter(
yield LocalPortal(actor, Channel((host, port))) yield LocalPortal(actor, Channel((host, port)))
else: else:
async with _connect_chan(host, port) as chan: async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal: async with open_portal(chan) as arb_portal:
yield arb_portal yield arb_portal
@asynccontextmanager @asynccontextmanager
async def get_root( async def get_root(
**kwargs, **kwargs,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
host, port = _runtime_vars['_root_mailbox'] host, port = _runtime_vars['_root_mailbox']
assert host is not None assert host is not None
async with _connect_chan(host, port) as chan: async with _connect_chan(host, port) as chan:
async with open_portal(chan, **kwargs) as portal: async with open_portal(chan, **kwargs) as portal:
yield portal yield portal
@ -60,12 +66,16 @@ async def find_actor(
""" """
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if name == 'arbiter' and actor.is_arbiter: if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter") raise RuntimeError("The current actor is the arbiter")
elif sockaddr: elif sockaddr:
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
@ -83,9 +93,12 @@ async def wait_for_actor(
A portal to the first registered actor is returned. A portal to the first registered actor is returned.
""" """
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1] sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal

View File

@ -9,6 +9,7 @@ import trio # type: ignore
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from . import _state from . import _state
from .to_asyncio import run_as_asyncio_guest
log = get_logger(__name__) log = get_logger(__name__)
@ -20,6 +21,7 @@ def _mp_main(
forkserver_info: Tuple[Any, Any, Any, Any, Any], forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str, start_method: str,
parent_addr: Tuple[str, int] = None, parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None: ) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run`` """The routine called *after fork* which invokes a fresh ``trio.run``
""" """
@ -45,7 +47,11 @@ def _mp_main(
parent_addr=parent_addr parent_addr=parent_addr
) )
try: try:
trio.run(trio_main) if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
pass # handle it the same way trio does? pass # handle it the same way trio does?
@ -57,6 +63,7 @@ def _trio_main(
actor: 'Actor', # type: ignore actor: 'Actor', # type: ignore
*, *,
parent_addr: Tuple[str, int] = None, parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio_run_in_process` subactor.
""" """
@ -66,6 +73,8 @@ def _trio_main(
log.info(f"Started new trio process for {actor.uid}") log.info(f"Started new trio process for {actor.uid}")
log.info(f"Started new trio process for {actor.uid}")
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")
@ -83,7 +92,11 @@ def _trio_main(
) )
try: try:
trio.run(trio_main) if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI") log.warning(f"Actor {actor.uid} received KBI")

View File

@ -1,7 +1,7 @@
""" """
Our classy exception set. Our classy exception set.
""" """
from typing import Dict, Any from typing import Dict, Any, Optional, Type
import importlib import importlib
import builtins import builtins
import traceback import traceback
@ -15,17 +15,16 @@ _this_mod = importlib.import_module(__name__)
class RemoteActorError(Exception): class RemoteActorError(Exception):
# TODO: local recontruction of remote exception deats # TODO: local recontruction of remote exception deats
"Remote actor exception bundled locally" "Remote actor exception bundled locally"
def __init__(self, message, type_str, **msgdata) -> None: def __init__(
super().__init__(message) self,
for ns in [builtins, _this_mod, trio]: message: str,
try: suberror_type: Optional[Type[BaseException]] = None,
self.type = getattr(ns, type_str) **msgdata
break
except AttributeError:
continue
else:
self.type = Exception
) -> None:
super().__init__(message)
self.type = suberror_type
self.msgdata = msgdata self.msgdata = msgdata
# TODO: a trio.MultiError.catch like context manager # TODO: a trio.MultiError.catch like context manager
@ -41,6 +40,9 @@ class InternalActorError(RemoteActorError):
class TransportClosed(trio.ClosedResourceError): class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use" "Underlying channel transport was closed prior to use"
class ContextCancelled(RemoteActorError):
"Inter-actor task context cancelled itself on the callee side."
class NoResult(RuntimeError): class NoResult(RuntimeError):
"No final result is expected for this actor" "No final result is expected for this actor"
@ -54,13 +56,22 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet" "The root actor has not been initialized yet"
def pack_error(exc: BaseException) -> Dict[str, Any]: def pack_error(
exc: BaseException,
tb = None,
) -> Dict[str, Any]:
"""Create an "error message" for tranmission over """Create an "error message" for tranmission over
a channel (aka the wire). a channel (aka the wire).
""" """
if tb:
tb_str = ''.join(traceback.format_tb(tb))
else:
tb_str = traceback.format_exc()
return { return {
'error': { 'error': {
'tb_str': traceback.format_exc(), 'tb_str': tb_str,
'type_str': type(exc).__name__, 'type_str': type(exc).__name__,
} }
} }
@ -77,12 +88,35 @@ def unpack_error(
into a local ``RemoteActorError``. into a local ``RemoteActorError``.
""" """
tb_str = msg['error'].get('tb_str', '') error = msg['error']
return err_type(
f"{chan.uid}\n" + tb_str, tb_str = error.get('tb_str', '')
message = f"{chan.uid}\n" + tb_str
type_name = error['type_str']
suberror_type: Type[BaseException] = Exception
if type_name == 'ContextCancelled':
err_type = ContextCancelled
suberror_type = trio.Cancelled
else: # try to lookup a suitable local error type
for ns in [builtins, _this_mod, trio]:
try:
suberror_type = getattr(ns, type_name)
break
except AttributeError:
continue
exc = err_type(
message,
suberror_type=suberror_type,
# unpack other fields into error type init
**msg['error'], **msg['error'],
) )
return exc
def is_multi_cancelled(exc: BaseException) -> bool: def is_multi_cancelled(exc: BaseException) -> bool:
"""Predicate to determine if a ``trio.MultiError`` contains only """Predicate to determine if a ``trio.MultiError`` contains only

View File

@ -1,5 +1,6 @@
""" """
Inter-process comms abstractions Inter-process comms abstractions
""" """
import platform import platform
import typing import typing
@ -61,7 +62,6 @@ class MsgpackTCPStream:
use_list=False, use_list=False,
) )
while True: while True:
try: try:
data = await self.stream.receive_some(2**10) data = await self.stream.receive_some(2**10)
@ -88,7 +88,7 @@ class MsgpackTCPStream:
else: else:
raise raise
log.trace(f"received {data}") # type: ignore log.transport(f"received {data}") # type: ignore
if data == b'': if data == b'':
raise TransportClosed( raise TransportClosed(
@ -169,6 +169,7 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, self,
destaddr: Tuple[Any, ...] = None, destaddr: Tuple[Any, ...] = None,
**kwargs **kwargs
@ -180,13 +181,21 @@ class Channel:
destaddr = destaddr or self._destaddr destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple) assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
stream = await trio.open_tcp_stream(
*destaddr,
**kwargs
)
self.msgstream = MsgpackTCPStream(stream) self.msgstream = MsgpackTCPStream(stream)
log.transport(
f'Opened channel to peer {self.laddr} -> {self.raddr}'
)
return stream return stream
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore log.transport(f"send `{item}`") # type: ignore
assert self.msgstream assert self.msgstream
await self.msgstream.send(item) await self.msgstream.send(item)
@ -205,7 +214,8 @@ class Channel:
raise raise
async def aclose(self) -> None: async def aclose(self) -> None:
log.debug(
log.transport(
f'Closing channel to {self.uid} ' f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}' f'{self.laddr} -> {self.raddr}'
) )
@ -234,11 +244,11 @@ class Channel:
await self.connect() await self.connect()
cancelled = cancel_scope.cancelled_caught cancelled = cancel_scope.cancelled_caught
if cancelled: if cancelled:
log.warning( log.transport(
"Reconnect timed out after 3 seconds, retrying...") "Reconnect timed out after 3 seconds, retrying...")
continue continue
else: else:
log.warning("Stream connection re-established!") log.transport("Stream connection re-established!")
# run any reconnection sequence # run any reconnection sequence
on_recon = self._recon_seq on_recon = self._recon_seq
if on_recon: if on_recon:
@ -247,7 +257,7 @@ class Channel:
except (OSError, ConnectionRefusedError): except (OSError, ConnectionRefusedError):
if not down: if not down:
down = True down = True
log.warning( log.transport(
f"Connection to {self.raddr} went down, waiting" f"Connection to {self.raddr} went down, waiting"
" for re-establishment") " for re-establishment")
await trio.sleep(1) await trio.sleep(1)

View File

@ -17,7 +17,12 @@ from async_generator import asynccontextmanager
from ._state import current_actor from ._state import current_actor
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
from ._exceptions import unpack_error, NoResult, RemoteActorError from ._exceptions import (
unpack_error,
NoResult,
RemoteActorError,
ContextCancelled,
)
from ._streaming import Context, ReceiveMsgStream from ._streaming import Context, ReceiveMsgStream
@ -84,7 +89,7 @@ class Portal:
ns: str, ns: str,
func: str, func: str,
kwargs, kwargs,
) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]:
"""Submit a function to be scheduled and run by actor, return the """Submit a function to be scheduled and run by actor, return the
associated caller id, response queue, response type str, associated caller id, response queue, response type str,
first message packet as a tuple. first message packet as a tuple.
@ -172,6 +177,7 @@ class Portal:
f"Cancelling all streams with {self.channel.uid}") f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy(): for stream in self._streams.copy():
try: try:
# with trio.CancelScope(shield=True):
await stream.aclose() await stream.aclose()
except trio.ClosedResourceError: except trio.ClosedResourceError:
# don't error the stream having already been closed # don't error the stream having already been closed
@ -289,6 +295,7 @@ class Portal:
self, self,
async_gen_func: Callable, # typing: ignore async_gen_func: Callable, # typing: ignore
**kwargs, **kwargs,
) -> AsyncGenerator[ReceiveMsgStream, None]: ) -> AsyncGenerator[ReceiveMsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func): if not inspect.isasyncgenfunction(async_gen_func):
@ -312,13 +319,23 @@ class Portal:
ctx = Context(self.channel, cid, _portal=self) ctx = Context(self.channel, cid, _portal=self)
try: try:
async with ReceiveMsgStream(ctx, recv_chan, self) as rchan: # deliver receive only stream
async with ReceiveMsgStream(ctx, recv_chan) as rchan:
self._streams.add(rchan) self._streams.add(rchan)
yield rchan yield rchan
finally: finally:
# cancel the far end task on consumer close # cancel the far end task on consumer close
# NOTE: this is a special case since we assume that if using
# this ``.open_fream_from()`` api, the stream is one a one
# time use and we couple the far end tasks's lifetime to
# the consumer's scope; we don't ever send a `'stop'`
# message right now since there shouldn't be a reason to
# stop and restart the stream, right?
try: try:
await ctx.cancel() await ctx.cancel()
except trio.ClosedResourceError: except trio.ClosedResourceError:
# if the far end terminates before we send a cancel the # if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed. # underlying transport-channel may already be closed.
@ -326,16 +343,127 @@ class Portal:
self._streams.remove(rchan) self._streams.remove(rchan)
# @asynccontextmanager @asynccontextmanager
# async def open_context( async def open_context(
# self,
# func: Callable, self,
# **kwargs, func: Callable,
# ) -> Context: **kwargs,
# # TODO
# elif resptype == 'context': # context manager style setup/teardown ) -> AsyncGenerator[Tuple[Context, Any], None]:
# # TODO likely not here though '''Open an inter-actor task context.
# raise NotImplementedError
This is a synchronous API which allows for deterministic
setup/teardown of a remote task. The yielded ``Context`` further
allows for opening bidirectional streams, explicit cancellation
and synchronized final result collection. See ``tractor.Context``.
'''
# conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False)
):
raise TypeError(
f'{func} must be an async generator function!')
fn_mod_path, fn_name = func_deats(func)
recv_chan: Optional[trio.MemoryReceiveChannel] = None
cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs)
assert functype == 'context'
msg = await recv_chan.receive()
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first = msg['started']
except KeyError:
assert msg.get('cid'), ("Received internal error at context?")
if msg.get('error'):
# raise the error message
raise unpack_error(msg, self.channel)
else:
raise
_err = None
# deliver context instance and .started() msg value in open
# tuple.
try:
async with trio.open_nursery() as scope_nursery:
ctx = Context(
self.channel,
cid,
_portal=self,
_recv_chan=recv_chan,
_scope_nursery=scope_nursery,
)
recv_chan._ctx = ctx
# await trio.lowlevel.checkpoint()
yield ctx, first
# if not ctx._cancel_called:
# await ctx.result()
# await recv_chan.aclose()
except ContextCancelled as err:
_err = err
if not ctx._cancel_called:
# context was cancelled at the far end but was
# not part of this end requesting that cancel
# so raise for the local task to respond and handle.
raise
# if the context was cancelled by client code
# then we don't need to raise since user code
# is expecting this and the block should exit.
else:
log.debug(f'Context {ctx} cancelled gracefully')
except (
trio.Cancelled,
trio.MultiError,
Exception,
) as err:
_err = err
# the context cancels itself on any cancel
# causing error.
log.error(f'Context {ctx} sending cancel to far end')
with trio.CancelScope(shield=True):
await ctx.cancel()
raise
finally:
result = await ctx.result()
# though it should be impossible for any tasks
# operating *in* this scope to have survived
# we tear down the runtime feeder chan last
# to avoid premature stream clobbers.
if recv_chan is not None:
await recv_chan.aclose()
if _err:
if ctx._cancel_called:
log.warning(
f'Context {fn_name} cancelled by caller with\n{_err}'
)
elif _err is not None:
log.warning(
f'Context {fn_name} cancelled by callee with\n{_err}'
)
else:
log.info(
f'Context {fn_name} returned '
f'value from callee `{self._result}`'
)
@dataclass @dataclass
@ -360,10 +488,12 @@ class LocalPortal:
@asynccontextmanager @asynccontextmanager
async def open_portal( async def open_portal(
channel: Channel, channel: Channel,
nursery: Optional[trio.Nursery] = None, nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True, start_msg_loop: bool = True,
shield: bool = False, shield: bool = False,
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``. """Open a ``Portal`` through the provided ``channel``.
@ -374,6 +504,7 @@ async def open_portal(
was_connected = False was_connected = False
async with maybe_open_nursery(nursery, shield=shield) as nursery: async with maybe_open_nursery(nursery, shield=shield) as nursery:
if not channel.connected(): if not channel.connected():
await channel.connect() await channel.connect()
was_connected = True was_connected = True
@ -395,12 +526,14 @@ async def open_portal(
portal = Portal(channel) portal = Portal(channel)
try: try:
yield portal yield portal
finally: finally:
await portal.aclose() await portal.aclose()
if was_connected: if was_connected:
# cancel remote channel-msg loop # gracefully signal remote channel-msg loop
await channel.send(None) await channel.send(None)
# await channel.aclose()
# cancel background msg loop task # cancel background msg loop task
if msg_loop_cs: if msg_loop_cs:

View File

@ -171,16 +171,18 @@ async def open_root_actor(
yield actor yield actor
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
logger.exception("Actor crashed:") # with trio.CancelScope(shield=True):
await _debug._maybe_enter_pm(err) entered = await _debug._maybe_enter_pm(err)
if not entered:
logger.exception("Root actor crashed:")
# always re-raise # always re-raise
raise raise
finally: finally:
logger.info("Shutting down root actor") logger.info("Shutting down root actor")
with trio.CancelScope(shield=True): await actor.cancel()
await actor.cancel()
finally: finally:
_state._current_actor = None _state._current_actor = None
logger.info("Root actor terminated") logger.info("Root actor terminated")
@ -232,7 +234,7 @@ def run(
def run_daemon( def run_daemon(
rpc_module_paths: List[str], enable_modules: List[str],
**kwargs **kwargs
) -> None: ) -> None:
"""Spawn daemon actor which will respond to RPC. """Spawn daemon actor which will respond to RPC.
@ -241,9 +243,9 @@ def run_daemon(
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests. is meant to run forever responding to RPC requests.
""" """
kwargs['rpc_module_paths'] = list(rpc_module_paths) kwargs['enable_modules'] = list(enable_modules)
for path in rpc_module_paths: for path in enable_modules:
importlib.import_module(path) importlib.import_module(path)
return run(partial(trio.sleep, float('inf')), **kwargs) return run(partial(trio.sleep, float('inf')), **kwargs)

View File

@ -22,7 +22,14 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override
from ._state import current_actor, is_main_process from ._state import (
current_actor,
is_main_process,
is_root_process,
_runtime_vars,
)
from ._debug import _global_actor_in_debug
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
@ -141,18 +148,40 @@ async def cancel_on_completion(
) )
else: else:
log.info( log.runtime(
f"Cancelling {portal.channel.uid} gracefully " f"Cancelling {portal.channel.uid} gracefully "
f"after result {result}") f"after result {result}")
# cancel the process now that we have a final result # cancel the process now that we have a final result
await portal.cancel_actor() await portal.cancel_actor()
async def do_hard_kill(
proc: trio.Process,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
with trio.move_on_after(3) as cs:
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and
# into here.
log.critical(f"HARD KILLING {proc}")
proc.kill()
@asynccontextmanager @asynccontextmanager
async def spawn_subactor( async def spawn_subactor(
subactor: 'Actor', subactor: 'Actor',
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
infect_asyncio: bool,
): ):
spawn_cmd = [ spawn_cmd = [
sys.executable, sys.executable,
@ -177,29 +206,55 @@ async def spawn_subactor(
subactor.loglevel subactor.loglevel
] ]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
proc = await trio.open_process(spawn_cmd) proc = await trio.open_process(spawn_cmd)
try: try:
yield proc yield proc
finally: finally:
log.runtime(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown # XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early # to avoid killing the process too early
# since trio does this internally on ``__aexit__()`` # since trio does this internally on ``__aexit__()``
# NOTE: we always "shield" join sub procs in # if (
# the outer scope since no actor zombies are # is_root_process()
# ever allowed. This ``__aexit__()`` also shields
# internally.
log.debug(f"Attempting to kill {proc}")
# NOTE: this timeout effectively does nothing right now since # # XXX: basically the pre-closing of stdstreams in a
# we are shielding the ``.wait()`` inside ``new_proc()`` which # # root-processe's ``trio.Process.aclose()`` can clobber
# will pretty much never release until the process exits. # # any existing debugger session so we avoid
with trio.move_on_after(3) as cs: # and _runtime_vars['_debug_mode']
async with proc: # and _global_actor_in_debug is not None
log.debug(f"Terminating {proc}") # ):
if cs.cancelled_caught: # # XXX: this is ``trio.Process.aclose()`` MINUS the
log.critical(f"HARD KILLING {proc}") # # std-streams pre-closing steps inside ``proc.__aexit__()``
proc.kill() # # (see below) which incluses a ``Process.kill()`` call
# log.error(
# "Root process tty is locked in debug mode by "
# f"{_global_actor_in_debug}. If the console is hanging, you "
# "may need to trigger a KBI to kill any "
# "not-fully-initialized" " subprocesses and allow errors "
# "from `trio` to propagate"
# )
# try:
# # one more graceful wait try can can be cancelled by KBI
# # sent by user.
# await proc.wait()
# finally:
# if proc.returncode is None:
# # with trio.CancelScope(shield=True):
# # await proc.wait()
# await do_hard_kill(proc)
# else:
await do_hard_kill(proc)
async def new_proc( async def new_proc(
@ -212,7 +267,7 @@ async def new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
use_trio_run_in_process: bool = False, infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
"""Create a new ``multiprocessing.Process`` using the """Create a new ``multiprocessing.Process`` using the
@ -223,13 +278,14 @@ async def new_proc(
# mark the new actor with the global spawn method # mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method subactor._spawn_method = _spawn_method
if use_trio_run_in_process or _spawn_method == 'trio': if _spawn_method == 'trio':
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
async with spawn_subactor( async with spawn_subactor(
subactor, subactor,
parent_addr, parent_addr,
infect_asyncio=infect_asyncio
) as proc: ) as proc:
log.info(f"Started {proc}") log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
@ -277,9 +333,14 @@ async def new_proc(
# reaping more stringently without the shield # reaping more stringently without the shield
# we used to have below... # we used to have below...
# always "hard" join sub procs:
# no actor zombies allowed
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
# async with proc:
# Always "hard" join sub procs since no actor zombies
# are allowed!
# this is a "light" (cancellable) join, the hard join is
# in the enclosing scope (see above).
await proc.wait() await proc.wait()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
@ -305,6 +366,7 @@ async def new_proc(
bind_addr=bind_addr, bind_addr=bind_addr,
parent_addr=parent_addr, parent_addr=parent_addr,
_runtime_vars=_runtime_vars, _runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status, task_status=task_status,
) )
@ -320,7 +382,7 @@ async def mp_new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
use_trio_run_in_process: bool = False, infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
@ -366,6 +428,7 @@ async def mp_new_proc(
fs_info, fs_info,
start_method, start_method,
parent_addr, parent_addr,
infect_asyncio,
), ),
# daemon=True, # daemon=True,
name=name, name=name,
@ -380,7 +443,7 @@ async def mp_new_proc(
if not proc.is_alive(): if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?") raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}") log.runtime(f"Started {proc}")
try: try:
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us

View File

@ -1,41 +1,312 @@
"""
Message stream types and APIs.
"""
import inspect import inspect
from contextlib import contextmanager # , asynccontextmanager from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Iterator, Optional from typing import (
Any, Iterator, Optional, Callable,
AsyncGenerator, Dict,
)
import warnings import warnings
import trio import trio
from ._ipc import Channel from ._ipc import Channel
from ._exceptions import unpack_error from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor
from .log import get_logger from .log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
@dataclass(frozen=True) # TODO: generic typing like trio's receive channel
class Context: # but with msgspec messages?
"""An IAC (inter-actor communication) context. # class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
Allows maintaining task or protocol specific state between communicating
actors. A unique context is created on the receiving end for every request
to a remote actor.
A context can be cancelled and (eventually) restarted from class ReceiveMsgStream(trio.abc.ReceiveChannel):
either side of the underlying IPC channel. """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
special behaviour for signalling stream termination across an
inter-actor ``Channel``. This is the type returned to a local task
which invoked a remote streaming function using `Portal.run()`.
A context can be used to open task oriented message streams. Termination rules:
- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise
a ``StopAsyncIteration`` to terminate the local ``async for``
""" """
def __init__(
self,
ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.abc.ReceiveChannel,
shield: bool = False,
) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._shielded = shield
# flag to denote end of stream
self._eoc: bool = False
# delegate directly to underlying mem channel
def receive_nowait(self):
msg = self._rx_chan.receive_nowait()
return msg['yield']
async def receive(self):
# see ``.aclose()`` for notes on the old behaviour prior to
# introducing this
if self._eoc:
raise trio.EndOfChannel
try:
msg = await self._rx_chan.receive()
return msg['yield']
except KeyError:
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10 match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
if msg.get('stop'):
log.debug(f"{self} was stopped at remote end")
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await self.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, self._ctx.chan)
else:
raise
except (
trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
trio.Cancelled, # by local cancellation
):
# XXX: we close the stream on any of these error conditions:
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
# an ``EndOfChannel`` indicates either the internal recv
# memchan exhausted **or** we raisesd it just above after
# receiving a `stop` message from the far end of the stream.
# Previously this was triggered by calling ``.aclose()`` on
# the send side of the channel inside
# ``Actor._push_result()`` (should still be commented code
# there - which should eventually get removed), but now the
# 'stop' message handling has been put just above.
# TODO: Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically.
# One we have broadcast support, we **don't** want to be
# closing this stream and not flushing a final value to
# remaining (clone) consumers who may not have been
# scheduled to receive it yet.
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
raise # propagate
@contextmanager
def shield(
self
) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
Note that here, "shielding" here guards against relaying
a ``'stop'`` message to the far end of the stream thus keeping
the stream machinery active and ready for further use, it does
not have anything to do with an internal ``trio.CancelScope``.
"""
self._shielded = True
yield self
self._shielded = False
async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
"""
# XXX: keep proper adherance to trio's `.aclose()` semantics:
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan
if rx_chan._closed:
log.warning(f"{self} is already closed")
# this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return
# TODO: broadcasting to multiple consumers
# stats = rx_chan.statistics()
# if stats.open_receive_channels > 1:
# # if we've been cloned don't kill the stream
# log.debug(
# "there are still consumers running keeping stream alive")
# return
if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return
# XXX: This must be set **AFTER** the shielded test above!
self._eoc = True
# NOTE: this is super subtle IPC messaging stuff:
# Relay stop iteration to far end **iff** we're
# in bidirectional mode. If we're only streaming
# *from* one side then that side **won't** have an
# entry in `Actor._cids2qs` (maybe it should though?).
# So any `yield` or `stop` msgs sent from the caller side
# will cause key errors on the callee side since there is
# no entry for a local feeder mem chan since the callee task
# isn't expecting messages to be sent by the caller.
# Thus, we must check that this context DOES NOT
# have a portal reference to ensure this is indeed the callee
# side and can relay a 'stop'.
# In the bidirectional case, `Context.open_stream()` will create
# the `Actor._cids2qs` entry from a call to
# `Actor.get_memchans()` and will send the stop message in
# ``__aexit__()`` on teardown so it **does not** need to be
# called here.
if not self._ctx._portal:
try:
# only for 2 way streams can we can send
# stop from the caller side
await self._ctx.send_stop()
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
# the underlying channel may already have been pulled
# in which case our stop message is meaningless since
# it can't traverse the transport.
log.debug(f'Channel for {self} was already closed')
# close the local mem chan ``self._rx_chan`` ??!?
# DEFINITELY NOT if we're a bi-dir ``MsgStream``!
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
# the potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that context has
# run to completion.
# XXX: Notes on old behaviour:
# await rx_chan.aclose()
# In the receive-only case, ``Portal.open_stream_from()`` used
# to rely on this call explicitly on teardown such that a new
# call to ``.receive()`` after ``rx_chan`` had been closed, would
# result in us raising a ``trio.EndOfChannel`` (since we
# remapped the ``trio.ClosedResourceError`). However, now if for some
# reason the stream's consumer code tries to manually receive a new
# value before ``.aclose()`` is called **but** the far end has
# stopped `.receive()` **must** raise ``trio.EndofChannel`` in
# order to avoid an infinite hang on ``.__anext__()``; this is
# why we added ``self._eoc`` to denote stream closure indepedent
# of ``rx_chan``.
# In theory we could still use this old method and close the
# underlying msg-loop mem chan as above and then **not** check
# for ``self._eoc`` in ``.receive()`` (if for some reason we
# think that check is a bottle neck - not likely) **but** then
# we would need to map the resulting
# ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in
# ``.receive()`` (as it originally was before bi-dir streaming
# support) in order to trigger stream closure. The old behaviour
# is arguably more confusing since we lose detection of the
# runtime's closure of ``rx_chan`` in the case where we may
# still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``).
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
"""
Bidirectional message stream for use within an inter-actor actor
``Context```.
"""
async def send(
self,
data: Any
) -> None:
'''Send a message over this stream to the far end.
'''
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
# TODO: but make it broadcasting to consumers
def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return MsgStream(
self._ctx,
self._rx_chan.clone(),
)
@dataclass
class Context:
'''An inter-actor task communication context.
Allows maintaining task or protocol specific state between
2 communicating actor tasks. A unique context is created on the
callee side/end for every request to a remote actor from a portal.
A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel.
A context can be used to open task oriented message streams and can
be thought of as an IPC aware inter-actor cancel scope.
'''
chan: Channel chan: Channel
cid: str cid: str
# only set on the caller side # only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa _portal: Optional['Portal'] = None # type: ignore # noqa
_recv_chan: Optional[trio.MemoryReceiveChannel] = None
_result: Optional[Any] = False
_cancel_called: bool = False
# only set on the callee side # only set on the callee side
_cancel_scope: Optional[trio.CancelScope] = None _scope_nursery: Optional[trio.Nursery] = None
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
@ -50,53 +321,226 @@ class Context:
async def send_stop(self) -> None: async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid}) await self.chan.send({'stop': True, 'cid': self.cid})
def _error_from_remote_msg(
self,
msg: Dict[str, Any],
) -> None:
'''Unpack and raise a msg error into the local scope
nursery for this context.
Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task.
'''
async def raiser():
raise unpack_error(msg, self.chan)
self._scope_nursery.start_soon(raiser)
async def cancel(self) -> None: async def cancel(self) -> None:
"""Cancel this inter-actor-task context. '''Cancel this inter-actor-task context.
Request that the far side cancel it's current linked context, Request that the far side cancel it's current linked context,
timeout quickly to sidestep 2-generals... Timeout quickly in an attempt to sidestep 2-generals...
""" '''
assert self._portal, ( side = 'caller' if self._portal else 'callee'
"No portal found, this is likely a callee side context")
cid = self.cid log.warning(f'Cancelling {side} side of context to {self.chan}')
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.warning(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
# NOTE: we're telling the far end actor to cancel a task self._cancel_called = True
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
if cs.cancelled_caught: if side == 'caller':
# XXX: there's no way to know if the remote task was indeed if not self._portal:
# cancelled in the case where the connection is broken or raise RuntimeError(
# some other network error occurred. "No portal found, this is likely a callee side context"
if not self._portal.channel.connected(): )
cid = self.cid
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.warning( log.warning(
"May have failed to cancel remote task " f"Cancelling stream {cid} to "
f"{cid} for {self._portal.channel.uid}") f"{self._portal.channel.uid}")
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
# if not self._portal.channel.connected():
if not self.chan.connected():
log.warning(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
else:
# callee side remote task
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36
self._scope_nursery.cancel_scope.cancel()
if self._recv_chan:
await self._recv_chan.aclose()
@asynccontextmanager
async def open_stream(
self,
shield: bool = False,
) -> AsyncGenerator[MsgStream, None]:
'''Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and
callee for the stream to logically be considered "connected".
A ``MsgStream`` is currently "one-shot" use, meaning if you
close it you can not "re-open" it for streaming and instead you
must re-establish a new surrounding ``Context`` using
``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support
"re-opening":
- pausing a stream can be done with a message.
- task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of
``trio``'s cancellation system.
'''
actor = current_actor()
# here we create a mem chan that corresponds to the
# far end caller / callee.
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.send_cmd()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
_, recv_chan = actor.get_memchans(
self.chan.uid,
self.cid
)
# Likewise if the surrounding context has been cancelled we error here
# since it likely means the surrounding block was exited or
# killed
if self._cancel_called:
task = trio.lowlevel.current_task().name
raise ContextCancelled(
f'Context around {actor.uid[0]}:{task} was already cancelled!'
)
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?')
async with MsgStream(
ctx=self,
rx_chan=recv_chan,
shield=shield,
) as rchan:
if self._portal:
self._portal._streams.add(rchan)
try:
# ensure we aren't cancelled before delivering
# the stream
# await trio.lowlevel.checkpoint()
yield rchan
except trio.EndOfChannel:
# likely the far end sent us a 'stop' message to
# terminate the stream.
raise
else:
# XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await self.send_stop()
finally:
if self._portal:
self._portal._streams.remove(rchan)
async def result(self) -> Any:
'''From a caller side, wait for and return the final result from
the callee side task.
'''
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
if self._result is False:
if not self._recv_chan._closed: # type: ignore
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
msg = await self._recv_chan.receive()
try:
self._result = msg['return']
break
except KeyError:
if 'yield' in msg:
# far end task is still streaming to us..
log.warning(f'Remote stream deliverd {msg}')
# do disard
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(msg, self._portal.channel)
return self._result
async def started(self, value: Optional[Any] = None) -> None:
if self._portal:
raise RuntimeError(
f"Caller side context {self} can not call started!")
await self.chan.send({'started': value, 'cid': self.cid})
# TODO: do we need a restart api?
# async def restart(self) -> None: # async def restart(self) -> None:
# # TODO
# pass
# @asynccontextmanager
# async def open_stream(
# self,
# ) -> AsyncContextManager:
# # TODO
# pass # pass
def stream(func): def stream(func: Callable) -> Callable:
"""Mark an async function as a streaming routine with ``@stream``. """Mark an async function as a streaming routine with ``@stream``.
""" """
func._tractor_stream_function = True # annotate
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_stream_function = True # type: ignore
sig = inspect.signature(func) sig = inspect.signature(func)
params = sig.parameters params = sig.parameters
if 'stream' not in params and 'ctx' in params: if 'stream' not in params and 'ctx' in params:
@ -114,147 +558,26 @@ def stream(func):
): ):
raise TypeError( raise TypeError(
"The first argument to the stream function " "The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`" f"{func.__name__} must be `ctx: tractor.Context` "
"(Or ``to_trio`` if using ``asyncio`` in guest mode)."
) )
return func return func
class ReceiveMsgStream(trio.abc.ReceiveChannel): def context(func: Callable) -> Callable:
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with """Mark an async function as a streaming routine with ``@context``.
special behaviour for signalling stream termination across an
inter-actor ``Channel``. This is the type returned to a local task
which invoked a remote streaming function using `Portal.run()`.
Termination rules:
- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise a
``StopAsyncIteration`` to terminate the local ``async for``
""" """
def __init__( # annotate
self, # TODO: apply whatever solution ``mypy`` ends up picking for this:
ctx: Context, # https://github.com/python/mypy/issues/2087#issuecomment-769266912
rx_chan: trio.abc.ReceiveChannel, func._tractor_context_function = True # type: ignore
portal: 'Portal', # type: ignore # noqa
) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._portal = portal
self._shielded = False
# delegate directly to underlying mem channel sig = inspect.signature(func)
def receive_nowait(self): params = sig.parameters
return self._rx_chan.receive_nowait() if 'ctx' not in params:
raise TypeError(
async def receive(self): "The first argument to the context function "
try: f"{func.__name__} must be `ctx: tractor.Context`"
msg = await self._rx_chan.receive() )
return msg['yield'] return func
except KeyError:
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10 match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
# TODO: test that shows stream raising an expected error!!!
if msg.get('error'):
# raise the error message
raise unpack_error(msg, self._portal.channel)
except (trio.ClosedResourceError, StopAsyncIteration):
# XXX: this indicates that a `stop` message was
# sent by the far side of the underlying channel.
# Currently this is triggered by calling ``.aclose()`` on
# the send side of the channel inside
# ``Actor._push_result()``, but maybe it should be put here?
# to avoid exposing the internal mem chan closing mechanism?
# in theory we could instead do some flushing of the channel
# if needed to ensure all consumers are complete before
# triggering closure too early?
# Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically.
# We **don't** want to be closing this send channel and not
# relaying a final value to remaining consumers who may not
# have been scheduled to receive it yet?
# lots of testing to do here
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
raise StopAsyncIteration
except trio.Cancelled:
# relay cancels to the remote task
await self.aclose()
raise
@contextmanager
def shield(
self
) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
"""
self._shielded = True
yield self
self._shielded = False
async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
"""
rx_chan = self._rx_chan
if rx_chan._closed:
log.warning(f"{self} is already closed")
return
# stats = rx_chan.statistics()
# if stats.open_receive_channels > 1:
# # if we've been cloned don't kill the stream
# log.debug(
# "there are still consumers running keeping stream alive")
# return
if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return
# close the local mem chan
rx_chan.close()
# cancel surrounding IPC context
await self._ctx.cancel()
# TODO: but make it broadcasting to consumers
# def clone(self):
# """Clone this receive channel allowing for multi-task
# consumption from the same channel.
# """
# return ReceiveStream(
# self._cid,
# self._rx_chan.clone(),
# self._portal,
# )
# class MsgStream(ReceiveMsgStream, trio.abc.Channel):
# """
# Bidirectional message stream for use within an inter-actor actor
# ``Context```.
# """
# async def send(
# self,
# data: Any
# ) -> None:
# await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})

View File

@ -11,7 +11,8 @@ import warnings
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from ._state import current_actor, is_main_process from . import _debug
from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
@ -60,6 +61,8 @@ class ActorNursery:
enable_modules: List[str] = None, enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None, nursery: trio.Nursery = None,
infect_asyncio: bool = False,
debug_mode: Optional[bool] = None,
) -> Portal: ) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
@ -67,6 +70,10 @@ class ActorNursery:
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
# allow setting debug policy per actor
if debug_mode is not None:
_rtv['_debug_mode'] = debug_mode
enable_modules = enable_modules or [] enable_modules = enable_modules or []
if rpc_module_paths: if rpc_module_paths:
@ -103,6 +110,7 @@ class ActorNursery:
bind_addr, bind_addr,
parent_addr, parent_addr,
_rtv, # run time vars _rtv, # run time vars
infect_asyncio=infect_asyncio,
) )
) )
@ -115,6 +123,7 @@ class ActorNursery:
rpc_module_paths: Optional[List[str]] = None, rpc_module_paths: Optional[List[str]] = None,
enable_modules: List[str] = None, enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn`` **kwargs, # explicit args to ``fn``
) -> Portal: ) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and """Spawn a new actor, run a lone task, then terminate the actor and
@ -139,6 +148,7 @@ class ActorNursery:
loglevel=loglevel, loglevel=loglevel,
# use the run_in_actor nursery # use the run_in_actor nursery
nursery=self._ria_nursery, nursery=self._ria_nursery,
infect_asyncio=infect_asyncio,
) )
# XXX: don't allow stream funcs # XXX: don't allow stream funcs
@ -169,16 +179,25 @@ class ActorNursery:
log.warning(f"Cancelling nursery in {self._actor.uid}") log.warning(f"Cancelling nursery in {self._actor.uid}")
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values(): for subactor, proc, portal in self._children.values():
# TODO: are we ever even going to use this or
# is the spawning backend responsible for such
# things? I'm thinking latter.
if hard_kill: if hard_kill:
proc.terminate() proc.terminate()
else: else:
if portal is None: # actor hasn't fully spawned yet if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid] event = self._actor._peer_connected[subactor.uid]
log.warning( log.warning(
f"{subactor.uid} wasn't finished spawning?") f"{subactor.uid} wasn't finished spawning?")
await event.wait() await event.wait()
# channel/portal should now be up # channel/portal should now be up
_, _, portal = self._children[subactor.uid] _, _, portal = self._children[subactor.uid]
@ -238,6 +257,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# As such if the strategy propagates any error(s) upwards # As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified. # the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery: async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery( anursery = ActorNursery(
actor, actor,
ria_nursery, ria_nursery,
@ -248,15 +268,53 @@ async def _open_and_supervise_one_cancels_all_nursery(
# spawning of actors happens in the caller's scope # spawning of actors happens in the caller's scope
# after we yield upwards # after we yield upwards
yield anursery yield anursery
log.debug(
log.runtime(
f"Waiting on subactors {anursery._children} " f"Waiting on subactors {anursery._children} "
"to complete" "to complete"
) )
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
# signal all process monitor tasks to conduct
# hard join phase.
anursery._join_procs.set()
except BaseException as err: except BaseException as err:
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
if is_root_process():
log.exception(f"we're root with {err}")
# wait to see if a sub-actor task
# will be scheduled and grab the tty
# lock on the next tick
# await trio.testing.wait_all_tasks_blocked()
debug_complete = _debug._no_remote_has_tty
if (
debug_complete and
not debug_complete.is_set()
):
log.warning(
'Root has errored but pdb is in use by '
f'child {_debug._global_actor_in_debug}\n'
'Waiting on tty lock to release..')
with trio.CancelScope(shield=True):
await debug_complete.wait()
# if the caller's scope errored then we activate our # if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't # one-cancels-all supervisor strategy (don't
# worry more are coming). # worry more are coming).
anursery._join_procs.set() anursery._join_procs.set()
try: try:
# XXX: hypothetically an error could be # XXX: hypothetically an error could be
# raised and then a cancel signal shows up # raised and then a cancel signal shows up
@ -292,15 +350,18 @@ async def _open_and_supervise_one_cancels_all_nursery(
else: else:
raise raise
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug("Waiting on all subactors to complete")
anursery._join_procs.set()
# ria_nursery scope end # ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well? # XXX: do we need a `trio.Cancelled` catch here as well?
except (Exception, trio.MultiError, trio.Cancelled) as err: # this is the catch around the ``.run_in_actor()`` nursery
except (
Exception,
trio.MultiError,
trio.Cancelled
) as err:
# If actor-local error was raised while waiting on # If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all # ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy: # remaining sub-actors (due to our lone strategy:
@ -357,7 +418,8 @@ async def open_nursery(
try: try:
if actor is None and is_main_process(): if actor is None and is_main_process():
# if we are the parent process start the actor runtime implicitly # if we are the parent process start the
# actor runtime implicitly
log.info("Starting actor runtime!") log.info("Starting actor runtime!")
# mark us for teardown on exit # mark us for teardown on exit
@ -366,6 +428,7 @@ async def open_nursery(
async with open_root_actor(**kwargs) as actor: async with open_root_actor(**kwargs) as actor:
assert actor is current_actor() assert actor is current_actor()
# try:
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as anursery: ) as anursery:
@ -376,7 +439,6 @@ async def open_nursery(
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as anursery: ) as anursery:
yield anursery yield anursery
finally: finally:

View File

@ -29,19 +29,20 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S' DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS = { LEVELS = {
'GARBAGE': 1, 'GARBAGE': 1,
'TRACE': 5, 'TRANSPORT': 5,
'PROFILE': 15, 'RUNTIME': 15,
'RUNTIME': 500, 'PDB': 500,
'QUIET': 1000, 'QUIET': 1000,
} }
STD_PALETTE = { STD_PALETTE = {
'CRITICAL': 'red', 'CRITICAL': 'red',
'ERROR': 'red', 'ERROR': 'red',
'RUNTIME': 'white', 'PDB': 'white',
'WARNING': 'yellow', 'WARNING': 'yellow',
'INFO': 'green', 'INFO': 'green',
'RUNTIME': 'white',
'DEBUG': 'white', 'DEBUG': 'white',
'TRACE': 'cyan', 'TRANSPORT': 'cyan',
'GARBAGE': 'blue', 'GARBAGE': 'blue',
} }
BOLD_PALETTE = { BOLD_PALETTE = {
@ -76,7 +77,7 @@ def get_logger(
# additional levels # additional levels
for name, val in LEVELS.items(): for name, val in LEVELS.items():
logging.addLevelName(val, name) logging.addLevelName(val, name)
# ex. create ``logger.trace()`` # ex. create ``logger.runtime()``
setattr(logger, name.lower(), partial(logger.log, val)) setattr(logger, name.lower(), partial(logger.log, val))
return logger return logger

View File

@ -1,9 +1,13 @@
""" """
Messaging pattern APIs and helpers. Messaging pattern APIs and helpers.
NOTE: this module is likely deprecated by the new bi-directional streaming
support provided by ``tractor.Context.open_stream()`` and friends.
""" """
import inspect import inspect
import typing import typing
from typing import Dict, Any, Set, Callable from typing import Dict, Any, Set, Callable, List, Tuple
from functools import partial from functools import partial
from async_generator import aclosing from async_generator import aclosing
@ -20,7 +24,7 @@ log = get_logger('messaging')
async def fan_out_to_ctxs( async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: Dict[str, set], topics2ctxs: Dict[str, list],
packetizer: typing.Callable = None, packetizer: typing.Callable = None,
) -> None: ) -> None:
"""Request and fan out quotes to each subscribed actor channel. """Request and fan out quotes to each subscribed actor channel.
@ -34,24 +38,27 @@ async def fan_out_to_ctxs(
async for published in pub_gen: async for published in pub_gen:
ctx_payloads: Dict[str, Any] = {} ctx_payloads: List[Tuple[Context, Any]] = []
for topic, data in published.items(): for topic, data in published.items():
log.debug(f"publishing {topic, data}") log.debug(f"publishing {topic, data}")
# build a new dict packet or invoke provided packetizer # build a new dict packet or invoke provided packetizer
if packetizer is None: if packetizer is None:
packet = {topic: data} packet = {topic: data}
else: else:
packet = packetizer(topic, data) packet = packetizer(topic, data)
for ctx in topics2ctxs.get(topic, set()):
ctx_payloads.setdefault(ctx, {}).update(packet), for ctx in topics2ctxs.get(topic, list()):
ctx_payloads.append((ctx, packet))
if not ctx_payloads: if not ctx_payloads:
log.debug(f"Unconsumed values:\n{published}") log.debug(f"Unconsumed values:\n{published}")
# deliver to each subscriber (fan out) # deliver to each subscriber (fan out)
if ctx_payloads: if ctx_payloads:
for ctx, payload in ctx_payloads.items(): for ctx, payload in ctx_payloads:
try: try:
await ctx.send_yield(payload) await ctx.send_yield(payload)
except ( except (
@ -60,15 +67,24 @@ async def fan_out_to_ctxs(
ConnectionRefusedError, ConnectionRefusedError,
): ):
log.warning(f"{ctx.chan} went down?") log.warning(f"{ctx.chan} went down?")
for ctx_set in topics2ctxs.values(): for ctx_list in topics2ctxs.values():
ctx_set.discard(ctx) try:
ctx_list.remove(ctx)
except ValueError:
continue
if not get_topics(): if not get_topics():
log.warning(f"No subscribers left for {pub_gen}") log.warning(f"No subscribers left for {pub_gen}")
break break
def modify_subs(topics2ctxs, topics, ctx): def modify_subs(
topics2ctxs: Dict[str, List[Context]],
topics: Set[str],
ctx: Context,
) -> None:
"""Absolute symbol subscription list for each quote stream. """Absolute symbol subscription list for each quote stream.
Effectively a symbol subscription api. Effectively a symbol subscription api.
@ -77,7 +93,7 @@ def modify_subs(topics2ctxs, topics, ctx):
# update map from each symbol to requesting client's chan # update map from each symbol to requesting client's chan
for topic in topics: for topic in topics:
topics2ctxs.setdefault(topic, set()).add(ctx) topics2ctxs.setdefault(topic, list()).append(ctx)
# remove any existing symbol subscriptions if symbol is not # remove any existing symbol subscriptions if symbol is not
# found in ``symbols`` # found in ``symbols``
@ -85,10 +101,14 @@ def modify_subs(topics2ctxs, topics, ctx):
for topic in filter( for topic in filter(
lambda topic: topic not in topics, topics2ctxs.copy() lambda topic: topic not in topics, topics2ctxs.copy()
): ):
ctx_set = topics2ctxs.get(topic) ctx_list = topics2ctxs.get(topic)
ctx_set.discard(ctx) if ctx_list:
try:
ctx_list.remove(ctx)
except ValueError:
pass
if not ctx_set: if not ctx_list:
# pop empty sets which will trigger bg quoter task termination # pop empty sets which will trigger bg quoter task termination
topics2ctxs.pop(topic) topics2ctxs.pop(topic)
@ -101,6 +121,7 @@ def pub(
wrapped: typing.Callable = None, wrapped: typing.Callable = None,
*, *,
tasks: Set[str] = set(), tasks: Set[str] = set(),
send_on_connect: Any = None,
): ):
"""Publisher async generator decorator. """Publisher async generator decorator.
@ -186,7 +207,7 @@ def pub(
# handle the decorator not called with () case # handle the decorator not called with () case
if wrapped is None: if wrapped is None:
return partial(pub, tasks=tasks) return partial(pub, tasks=tasks, send_on_connect=send_on_connect)
task2lock: Dict[str, trio.StrictFIFOLock] = {} task2lock: Dict[str, trio.StrictFIFOLock] = {}
@ -229,6 +250,11 @@ def pub(
try: try:
modify_subs(topics2ctxs, topics, ctx) modify_subs(topics2ctxs, topics, ctx)
# if specified send the startup message back to consumer
if send_on_connect is not None:
await ctx.send_yield(send_on_connect)
# block and let existing feed task deliver # block and let existing feed task deliver
# stream data until it is cancelled in which case # stream data until it is cancelled in which case
# the next waiting task will take over and spawn it again # the next waiting task will take over and spawn it again
@ -256,7 +282,7 @@ def pub(
respawn = True respawn = True
finally: finally:
# remove all subs for this context # remove all subs for this context
modify_subs(topics2ctxs, (), ctx) modify_subs(topics2ctxs, set(), ctx)
# if there are truly no more subscriptions with this broker # if there are truly no more subscriptions with this broker
# drop from broker subs dict # drop from broker subs dict

View File

@ -78,7 +78,7 @@ def tractor_test(fn):
else: else:
# use implicit root actor start # use implicit root actor start
main = partial(fn, *args, **kwargs), main = partial(fn, *args, **kwargs)
return trio.run(main) return trio.run(main)
# arbiter_addr=arb_addr, # arbiter_addr=arb_addr,

View File

@ -0,0 +1,311 @@
"""
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
"""
import asyncio
import inspect
from typing import (
Any,
Callable,
AsyncIterator,
Awaitable,
)
import trio
from .log import get_logger
from ._state import current_actor
log = get_logger(__name__)
__all__ = ['run_task', 'run_as_asyncio_guest']
async def run_coro(
to_trio: trio.MemorySendChannel,
coro: Awaitable,
) -> None:
"""Await ``coro`` and relay result back to ``trio``.
"""
to_trio.send_nowait(await coro)
async def consume_asyncgen(
to_trio: trio.MemorySendChannel,
coro: AsyncIterator,
) -> None:
"""Stream async generator results back to ``trio``.
``from_trio`` might eventually be used here for
bidirectional streaming.
"""
async for item in coro:
to_trio.send_nowait(item)
def _run_asyncio_task(
func: Callable,
*,
qsize: int = 1,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
assert current_actor().is_infected_aio()
# ITC (inter task comms)
from_trio = asyncio.Queue(qsize) # type: ignore
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
from_aio._err = None
args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return
# value otherwise it would just return ;P
# _treat_as_stream = True
assert qsize > 1
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
# if 'from_aio' in args:
# kwargs['from_aio'] = from_aio
coro = func(**kwargs)
# cancel_scope = trio.CancelScope()
# start the asyncio task we submitted from trio
if inspect.isawaitable(coro):
task = asyncio.create_task(run_coro(to_trio, coro))
elif inspect.isasyncgen(coro):
task = asyncio.create_task(consume_asyncgen(to_trio, coro))
else:
raise TypeError(f"No support for invoking {coro}")
aio_err = None
def cancel_trio(task):
"""Cancel the calling ``trio`` task on error.
"""
nonlocal aio_err
try:
aio_err = task.exception()
except asyncio.CancelledError as cerr:
aio_err = cerr
if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel()
from_aio._err = aio_err
task.add_done_callback(cancel_trio)
return task, from_aio, to_trio
async def run_task(
func: Callable,
*,
qsize: int = 2**10,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
# assert current_actor().is_infected_aio()
# # ITC (inter task comms)
# from_trio = asyncio.Queue(qsize) # type: ignore
# to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
# args = tuple(inspect.getfullargspec(func).args)
# if getattr(func, '_tractor_steam_function', None):
# # the assumption is that the target async routine accepts the
# # send channel then it intends to yield more then one return
# # value otherwise it would just return ;P
# _treat_as_stream = True
# # allow target func to accept/stream results manually by name
# if 'to_trio' in args:
# kwargs['to_trio'] = to_trio
# if 'from_trio' in args:
# kwargs['from_trio'] = from_trio
# coro = func(**kwargs)
# cancel_scope = trio.CancelScope()
# # start the asyncio task we submitted from trio
# if inspect.isawaitable(coro):
# task = asyncio.create_task(run_coro(to_trio, coro))
# elif inspect.isasyncgen(coro):
# task = asyncio.create_task(consume_asyncgen(to_trio, coro))
# else:
# raise TypeError(f"No support for invoking {coro}")
# aio_err = None
# def cancel_trio(task):
# """Cancel the calling ``trio`` task on error.
# """
# nonlocal aio_err
# aio_err = task.exception()
# if aio_err:
# log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel()
# task.add_done_callback(cancel_trio)
# async iterator
# if inspect.isasyncgen(coro) or _treat_as_stream:
# if inspect.isasyncgenfunction(meth) or :
if _treat_as_stream:
task, from_aio, to_trio = _run_asyncio_task(
func,
qsize=2**8,
**kwargs,
)
return from_aio
# async def stream_results():
# try:
# with cancel_scope:
# # stream values upward
# async with from_aio:
# async for item in from_aio:
# yield item
# if cancel_scope.cancelled_caught:
# # always raise from any captured asyncio error
# if aio_err:
# raise aio_err
# except BaseException as err:
# if aio_err is not None:
# # always raise from any captured asyncio error
# raise err from aio_err
# else:
# raise
# finally:
# # breakpoint()
# task.cancel()
# return stream_results()
# simple async func
try:
task, from_aio, to_trio = _run_asyncio_task(
func,
qsize=1,
**kwargs,
)
# with cancel_scope:
# async with from_aio:
# return single value
return await from_aio.receive()
# if cancel_scope.cancelled_caught:
# # always raise from any captured asyncio error
# if aio_err:
# raise aio_err
# Do we need this?
except Exception as err:
# await tractor.breakpoint()
aio_err = from_aio._err
# try:
if aio_err is not None:
# always raise from any captured asyncio error
raise err from aio_err
else:
raise
# finally:
# if not task.done():
# task.cancel()
except trio.Cancelled:
if not task.done():
task.cancel()
raise
# async def stream_from_task
# pass
def run_as_asyncio_guest(
trio_main: Callable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
log.info(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
# might as well if it's installed.
try:
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
except ImportError:
pass
asyncio.run(aio_main(trio_main))