Compare commits

...

53 Commits

Author SHA1 Message Date
Tyler Goodlet e1f128a79c Move debugger wait inside OCA nursery 2021-06-10 14:02:12 -04:00
Tyler Goodlet d85f4fda57 Add a multi-task streaming test 2021-06-10 14:00:09 -04:00
Tyler Goodlet 52f135d85d Avoid mutate on iterate race 2021-06-10 13:59:08 -04:00
Tyler Goodlet b29d2f7053 Only close recv chan if we get a ref 2021-06-10 13:58:06 -04:00
Tyler Goodlet 5960330413 Add error case 2021-06-10 13:57:16 -04:00
Tyler Goodlet 727a2084d6 Don't shield debugger status wait; it causes hangs 2021-06-02 08:24:59 -04:00
Tyler Goodlet f943ea0119 Drop bad .close() call 2021-06-02 08:22:51 -04:00
Tyler Goodlet cb2d2ed9d5 Proxy asyncio cancelleds as well 2021-05-31 09:29:45 -04:00
Tyler Goodlet 7b2543512c Power of 2 cuz puters 2021-05-31 09:29:45 -04:00
Tyler Goodlet 7d41492f53 Don't kill root's immediate children when in debug
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
2021-05-31 09:29:45 -04:00
Tyler Goodlet bc468d9140 WIP redo asyncio async gen streaming 2021-05-31 09:29:45 -04:00
Tyler Goodlet bf21c90461 Support asyncio actors with the trio spawner backend 2021-05-31 09:29:45 -04:00
Tyler Goodlet 316667a41f Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-05-31 09:29:45 -04:00
Tyler Goodlet ad9ed19bce Support asyncio actors with the trio spawner backend 2021-05-31 09:29:28 -04:00
Tyler Goodlet 98a69f06b4 Support no arg to `Context.started()` like trio 2021-05-31 09:28:07 -04:00
Tyler Goodlet df420fcafc Export portal type at top level 2021-05-31 09:28:07 -04:00
Tyler Goodlet 6834abb46f Link to SC on wikipedia 2021-05-31 09:28:07 -04:00
Tyler Goodlet 72099985be Add per actor debug mode toggle 2021-05-31 09:28:07 -04:00
Tyler Goodlet b1d3e97697 Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-05-31 09:28:07 -04:00
Tyler Goodlet d6b16cabbe Pass func refs 2021-05-31 09:27:40 -04:00
Tyler Goodlet 0f8699a96a Shush the linter 2021-05-31 09:27:40 -04:00
Tyler Goodlet 43d2439335 Add initial infected asyncio error propagation test 2021-05-31 09:27:40 -04:00
Tyler Goodlet 6476df2c69 Raise any asyncio errors if in trio task on cancel 2021-05-31 09:27:40 -04:00
Tyler Goodlet 762e6ad2a2 Raise from asyncio error; fixes mypy 2021-05-31 09:27:40 -04:00
Tyler Goodlet 6ae4d8699e Tweak log msg 2021-05-31 09:27:40 -04:00
Tyler Goodlet dd0cfe2f5a Log error 2021-05-31 09:27:40 -04:00
Tyler Goodlet 1f3de88422 Support asyncio actors with the trio spawner backend 2021-05-31 09:27:40 -04:00
Tyler Goodlet c34d4b54fd Revert removal of `infect_asyncio` in nursery start methods 2021-05-31 09:27:40 -04:00
Tyler Goodlet aed60dd7e2 Attempt to make mypy happy.. 2021-05-31 09:27:40 -04:00
Tyler Goodlet 5f0ed8e7bd Add an obnoxious error message on internal failures 2021-05-31 09:27:40 -04:00
Tyler Goodlet a9985c0c01 Wow, fix all the broken async func invoking code..
Clearly this wasn't developed against a task that spawned just an async
func in `asyncio`.. Fix all that and remove a bunch of unnecessary func
layers. Add provisional support for the target receiving the `to_trio`
and `from_trio` channels and for the @tractor.stream marker.
2021-05-31 09:27:40 -04:00
Tyler Goodlet 940774f215 Drop entrypoints from `Actor` 2021-05-31 09:27:40 -04:00
Tyler Goodlet db15162f04 Move asyncio guest mode entrypoint to `to_asyncio`
The function is useful if you want to run the "main process" under
`asyncio`. Until `trio` core wraps this better we'll keep our own copy
in the interim (there's a new "inside-out-guest" mode almost on
mainline so hang tight).
2021-05-31 09:27:40 -04:00
Tyler Goodlet 9d4dba23a6 Propagate any spawned `asyncio` task error upwards
This should mostly maintain top level SC principles for any task spawned
using `tractor.to_asyncio.run()`. When the `asyncio` task completes make
sure to cancel the pertaining `trio` cancel scope and raise any error
that may have resulted.

Resolves #120
2021-05-31 09:27:40 -04:00
Tyler Goodlet 24c6a65873 Add a @pub kwarg to allow specifying a "startup response message" 2021-05-31 09:27:10 -04:00
Tyler Goodlet 35465a5438 Catch and delay errors in the root if debugger is active 2021-05-31 09:26:26 -04:00
Tyler Goodlet 5b3f554f57 Don't shield on root cancel it can causes hangs 2021-05-31 09:26:17 -04:00
Tyler Goodlet 0d0e0bc2da Experiment: only disable SIGINT handling in children 2021-05-31 09:26:05 -04:00
Tyler Goodlet 19825ba284 Don't kill root's immediate children when in debug
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
2021-05-31 09:25:47 -04:00
Tyler Goodlet 682ef3425f Add debug example that causes pdb stdin clobbering 2021-05-31 09:25:47 -04:00
Tyler Goodlet 4a16f03fe8 Fix up var naming and typing 2021-05-31 09:25:38 -04:00
Tyler Goodlet 63e13aefdb Only send stop msg if not received from far end 2021-05-31 09:25:38 -04:00
Tyler Goodlet bfa4410133 Expose msg stream types at top level 2021-05-31 09:25:38 -04:00
Tyler Goodlet 6f2fb9e47c Add dynamic pubsub test using new bidir stream apis 2021-05-31 09:25:38 -04:00
Tyler Goodlet 420cd8de13 Use context for remote debugger locking
A context is the natural fit (vs. a receive stream) for locking the root
proc's tty usage via it's `.started()` sync point. Simplify the
`_breakpoin()` routine to be a simple async func instead of all this
"returning a coroutine" stuff from before we decided that
`tractor.breakpoint()` must be async. Use `runtime` level for locking
logging making it easier to trace.
2021-05-31 09:25:38 -04:00
Tyler Goodlet 01bef653c2 Be more pedantic with error handling 2021-05-31 09:25:38 -04:00
Tyler Goodlet 899404932a Fix typing 2021-05-31 09:25:38 -04:00
Tyler Goodlet 49deef0d71 Parametrize with async for style tests 2021-05-31 09:25:38 -04:00
Tyler Goodlet e46ef8ae3f Support passing `shield` at stream contruction 2021-05-31 09:25:38 -04:00
Tyler Goodlet b5116c5a51 Add basic test set 2021-05-31 09:25:38 -04:00
Tyler Goodlet c6e52b0387 Cancel scope on stream consumer completion 2021-05-31 09:25:38 -04:00
Tyler Goodlet f792aa06b5 Expose `@context` decorator at top level 2021-05-31 09:25:38 -04:00
Tyler Goodlet 15fa777ddf Add initial bi-directional streaming
This mostly adds the api described in
https://github.com/goodboy/tractor/issues/53#issuecomment-806258798

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
2021-05-31 09:25:38 -04:00
18 changed files with 1548 additions and 343 deletions

View File

@ -13,7 +13,7 @@
``tractor`` is a `structured concurrent`_ "`actor model`_" built on trio_ and multi-processing_.
We pair structured concurrency and true multi-core parallelism with
We pair `structured concurrency`_ and true multi-core parallelism with
the aim of being the multi-processing framework *you always wanted*.
The first step to grok ``tractor`` is to get the basics of ``trio`` down.
@ -285,7 +285,7 @@ channel`_!
.. _messages: https://en.wikipedia.org/wiki/Message_passing
.. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/

View File

@ -0,0 +1,31 @@
import trio
import tractor
async def key_error():
"Raise a ``NameError``"
return {}['doggy']
async def main():
"""Root dies
"""
async with tractor.open_nursery(
debug_mode=True,
loglevel='debug'
) as n:
# spawn both actors
portal = await n.run_in_actor(key_error)
# XXX: originally a bug causes by this
# where root would enter debugger even
# though child should have it locked.
with trio.fail_after(1):
await trio.Event().wait()
if __name__ == '__main__':
trio.run(main)

212
tests/test_2way.py 100644
View File

@ -0,0 +1,212 @@
"""
Bidirectional streaming and context API.
"""
import pytest
import trio
import tractor
# from conftest import tractor_test
# TODO: test endofchannel semantics / cancellation / error cases:
# 3 possible outcomes:
# - normal termination: far end relays a stop message with
# final value as in async gen from ``return <val>``.
# possible outcomes:
# - normal termination: far end returns
# - premature close: far end relays a stop message to tear down stream
# - cancel: far end raises `ContextCancelled`
# future possible outcomes
# - restart request: far end raises `ContextRestart`
_state: bool = False
@tractor.context
async def simple_setup_teardown(
ctx: tractor.Context,
data: int,
) -> None:
# startup phase
global _state
_state = True
# signal to parent that we're up
await ctx.started(data + 1)
try:
# block until cancelled
await trio.sleep_forever()
finally:
_state = False
async def assert_state(value: bool):
global _state
assert _state == value
@pytest.mark.parametrize(
'error_parent',
[False, True],
)
def test_simple_context(error_parent):
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'simple_context',
enable_modules=[__name__],
)
async with portal.open_context(
simple_setup_teardown,
data=10,
) as (ctx, sent):
assert sent == 11
await portal.run(assert_state, value=True)
# after cancellation
await portal.run(assert_state, value=False)
if error_parent:
raise ValueError
# shut down daemon
await portal.cancel_actor()
if error_parent:
try:
trio.run(main)
except ValueError:
pass
else:
trio.run(main)
@tractor.context
async def simple_rpc(
ctx: tractor.Context,
data: int,
) -> None:
"""Test a small ping-pong server.
"""
# signal to parent that we're up
await ctx.started(data + 1)
print('opening stream in callee')
async with ctx.open_stream() as stream:
count = 0
while True:
try:
await stream.receive() == 'ping'
except trio.EndOfChannel:
assert count == 10
break
else:
print('pong')
await stream.send('pong')
count += 1
@tractor.context
async def simple_rpc_with_forloop(
ctx: tractor.Context,
data: int,
) -> None:
"""Same as previous test but using ``async for`` syntax/api.
"""
# signal to parent that we're up
await ctx.started(data + 1)
print('opening stream in callee')
async with ctx.open_stream() as stream:
count = 0
async for msg in stream:
assert msg == 'ping'
print('pong')
await stream.send('pong')
count += 1
else:
assert count == 10
@pytest.mark.parametrize(
'use_async_for',
[True, False],
)
@pytest.mark.parametrize(
'server_func',
[simple_rpc, simple_rpc_with_forloop],
)
def test_simple_rpc(server_func, use_async_for):
"""The simplest request response pattern.
"""
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'rpc_server',
enable_modules=[__name__],
)
async with portal.open_context(
server_func, # taken from pytest parameterization
data=10,
) as (ctx, sent):
assert sent == 11
async with ctx.open_stream() as stream:
if use_async_for:
count = 0
# receive msgs using async for style
print('ping')
await stream.send('ping')
async for msg in stream:
assert msg == 'pong'
print('ping')
await stream.send('ping')
count += 1
if count >= 9:
break
else:
# classic send/receive style
for _ in range(10):
print('ping')
await stream.send('ping')
assert await stream.receive() == 'pong'
# stream should terminate here
await portal.cancel_actor()
trio.run(main)

View File

@ -0,0 +1,220 @@
"""
Advanced streaming patterns using bidirectional streams and contexts.
"""
import itertools
from typing import Set, Dict, List
import trio
import tractor
_registry: Dict[str, Set[tractor.ReceiveMsgStream]] = {
'even': set(),
'odd': set(),
}
async def publisher(
seed: int = 0,
) -> None:
global _registry
def is_even(i):
return i % 2 == 0
for val in itertools.count(seed):
sub = 'even' if is_even(val) else 'odd'
for sub_stream in _registry[sub]:
await sub_stream.send(val)
# throttle send rate to ~4Hz
# making it readable to a human user
await trio.sleep(1/4)
@tractor.context
async def subscribe(
ctx: tractor.Context,
) -> None:
global _registry
# syn caller
await ctx.started(None)
async with ctx.open_stream() as stream:
# update subs list as consumer requests
async for new_subs in stream:
new_subs = set(new_subs)
remove = new_subs - _registry.keys()
print(f'setting sub to {new_subs} for {ctx.chan.uid}')
# remove old subs
for sub in remove:
_registry[sub].remove(stream)
# add new subs for consumer
for sub in new_subs:
_registry[sub].add(stream)
async def consumer(
subs: List[str],
) -> None:
uid = tractor.current_actor().uid
async with tractor.wait_for_actor('publisher') as portal:
async with portal.open_context(subscribe) as (ctx, first):
async with ctx.open_stream() as stream:
# flip between the provided subs dynamically
if len(subs) > 1:
for sub in itertools.cycle(subs):
print(f'setting dynamic sub to {sub}')
await stream.send([sub])
count = 0
async for value in stream:
print(f'{uid} got: {value}')
if count > 5:
break
count += 1
else: # static sub
await stream.send(subs)
async for value in stream:
print(f'{uid} got: {value}')
def test_dynamic_pub_sub():
global _registry
from multiprocessing import cpu_count
cpus = cpu_count()
async def main():
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
await n.run_in_actor(publisher)
for i, sub in zip(
range(cpus - 2),
itertools.cycle(_registry.keys())
):
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
)
# make one dynamic subscriber
await n.run_in_actor(
consumer,
name='consumer_dynamic',
subs=list(_registry.keys()),
)
# block until cancelled by user
with trio.fail_after(10):
await trio.sleep_forever()
try:
trio.run(main)
except trio.TooSlowError:
pass
@tractor.context
async def one_task_streams_and_one_handles_reqresp(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async def pingpong():
'''Run a simple req/response service.
'''
async for msg in stream:
print('rpc server ping')
assert msg == 'ping'
print('rpc server pong')
await stream.send('pong')
async with trio.open_nursery() as n:
n.start_soon(pingpong)
for _ in itertools.count():
await stream.send('yo')
await trio.sleep(0.01)
def test_reqresp_ontopof_streaming():
'''Test a subactor that both streams with one task and
spawns another which handles a small requests-response
dialogue over the same bidir-stream.
'''
async def main():
with trio.move_on_after(2):
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
portal = await n.start_actor(
'dual_tasks',
enable_modules=[__name__]
)
# flat to make sure we get at least one pong
got_pong: bool = False
async with portal.open_context(
one_task_streams_and_one_handles_reqresp,
) as (ctx, first):
assert first is None
async with ctx.open_stream() as stream:
await stream.send('ping')
async for msg in stream:
print(f'client received: {msg}')
assert msg in {'pong', 'yo'}
if msg == 'pong':
got_pong = True
await stream.send('ping')
print('client sent ping')
assert got_pong
try:
trio.run(main)
except trio.TooSlowError:
pass

View File

@ -0,0 +1,24 @@
import asyncio
import pytest
import tractor
async def sleep_and_err():
await asyncio.sleep(0.1)
assert 0
async def asyncio_actor():
assert tractor.current_actor().is_infected_aio()
await tractor.to_asyncio.run_task(sleep_and_err)
def test_infected_simple_error(arb_addr):
async def main():
async with tractor.open_nursery() as n:
await n.run_in_actor(asyncio_actor, infected_asyncio=True)
with pytest.raises(tractor.RemoteActorError) as excinfo:
tractor.run(main, arbiter_addr=arb_addr)

View File

@ -338,6 +338,8 @@ async def test_respawn_consumer_task(
print("all values streamed, BREAKING")
break
cs.cancel()
# TODO: this is justification for a
# ``ActorNursery.stream_from_actor()`` helper?
await portal.cancel_actor()

View File

@ -5,7 +5,13 @@ tractor: An actor model micro-framework built on
from trio import MultiError
from ._ipc import Channel
from ._streaming import Context, stream
from ._streaming import (
Context,
ReceiveMsgStream,
MsgStream,
stream,
context,
)
from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._trionics import open_nursery
from ._state import current_actor, is_root_process
@ -13,6 +19,7 @@ from ._exceptions import RemoteActorError, ModuleNotExposed
from ._debug import breakpoint, post_mortem
from . import msg
from ._root import run, run_daemon, open_root_actor
from ._portal import Portal
__all__ = [
@ -29,11 +36,12 @@ __all__ = [
'msg',
'open_nursery',
'open_root_actor',
'Portal',
'post_mortem',
'run',
'run_daemon',
'stream',
'wait_for_actor',
'context',
'to_asyncio',
'wait_for_actor',
]

View File

@ -1,5 +1,6 @@
"""
Actor primitives and helpers
"""
from collections import defaultdict
from functools import partial
@ -11,9 +12,11 @@ import uuid
import typing
from typing import Dict, List, Tuple, Any, Optional, Union
from types import ModuleType
import signal
import sys
import os
from contextlib import ExitStack
import warnings
import trio # type: ignore
from trio_typing import TaskStatus
@ -57,13 +60,37 @@ async def _invoke(
treat_as_gen = False
cs = None
cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, cancel_scope)
ctx = Context(chan, cid, _cancel_scope=cancel_scope)
context = False
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func)
params = sig.parameters
# compat with old api
kwargs['ctx'] = ctx
if 'ctx' in params:
warnings.warn(
"`@tractor.stream decorated funcs should now declare "
"a `stream` arg, `ctx` is now designated for use with "
"@tractor.context",
DeprecationWarning,
stacklevel=2,
)
elif 'stream' in params:
assert 'stream' in params
kwargs['stream'] = ctx
treat_as_gen = True
elif getattr(func, '_tractor_context_function', False):
# handle decorated ``@tractor.context`` async function
kwargs['ctx'] = ctx
context = True
# errors raised inside this block are propgated back to caller
try:
if not (
@ -101,8 +128,9 @@ async def _invoke(
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': True, 'cid': cid})
else:
if treat_as_gen:
# one way @stream func that gets treated like an async gen
elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
@ -111,10 +139,24 @@ async def _invoke(
with cancel_scope as cs:
task_status.started(cs)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})
with cancel_scope as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
# if cs.cancelled_caught:
# # task was cancelled so relay to the cancel to caller
# await chan.send({'return': await coro, 'cid': cid})
else:
# regular async function
await chan.send({'functype': 'asyncfunc', 'cid': cid})
@ -195,6 +237,9 @@ class Actor:
_parent_main_data: Dict[str, str]
_parent_chan_cs: Optional[trio.CancelScope] = None
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
def __init__(
self,
name: str,
@ -262,7 +307,7 @@ class Actor:
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
async def wait_for_peer(
self, uid: Tuple[str, str]
@ -404,10 +449,10 @@ class Actor:
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
assert send_chan.cid == cid # type: ignore
if 'stop' in msg:
log.debug(f"{send_chan} was terminated at remote end")
# indicate to consumer that far end has stopped
return await send_chan.aclose()
# if 'stop' in msg:
# log.debug(f"{send_chan} was terminated at remote end")
# # indicate to consumer that far end has stopped
# return await send_chan.aclose()
try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
@ -415,6 +460,12 @@ class Actor:
await send_chan.send(msg)
except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the
# local task has already sent a 'stop' / StopAsyncInteration
# to the other side but and possibly has closed the local
# feeder mem chan? Do we wait for some kind of ack or just
# let this fail silently and bubble up (currently)?
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed")
@ -428,7 +479,7 @@ class Actor:
try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
except KeyError:
send_chan, recv_chan = trio.open_memory_channel(1000)
send_chan, recv_chan = trio.open_memory_channel(2*10)
send_chan.cid = cid # type: ignore
recv_chan.cid = cid # type: ignore
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
@ -477,11 +528,14 @@ class Actor:
task_status.started(loop_cs)
async for msg in chan:
if msg is None: # loop terminate sentinel
log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks:
for (channel, cid) in self._rpc_tasks.copy():
if channel is chan:
await self._cancel_task(cid, channel)
log.debug(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
@ -494,6 +548,7 @@ class Actor:
if cid:
# deliver response to local caller/waiter
await self._push_result(chan, cid, msg)
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
@ -635,6 +690,12 @@ class Actor:
for attr, value in parent_data.items():
setattr(self, attr, value)
# Disable sigint handling in children if NOT running in
# debug mode; we shouldn't need it thanks to our
# cancellation machinery.
if 'debug_mode' not in rvs:
signal.signal(signal.SIGINT, signal.SIG_IGN)
return chan, accept_addr
except OSError: # failed to connect
@ -1021,6 +1082,9 @@ class Actor:
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid
def is_infected_aio(self) -> bool:
return self._infected_aio
class Arbiter(Actor):
"""A special actor who knows all the other actors and always has

View File

@ -19,12 +19,15 @@ def parse_ipaddr(arg):
return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr)
parser.add_argument("--asyncio", action='store_true')
args = parser.parse_args()
subactor = Actor(
@ -36,5 +39,6 @@ if __name__ == "__main__":
_trio_main(
subactor,
parent_addr=args.parent_addr
parent_addr=args.parent_addr,
infect_asyncio=args.asyncio,
)

View File

@ -1,13 +1,13 @@
"""
Multi-core debugging for da peeps!
"""
import bdb
import sys
from functools import partial
from contextlib import asynccontextmanager
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
from typing import Tuple, Optional, Callable, AsyncIterator
from async_generator import aclosing
import tractor
import trio
@ -31,14 +31,21 @@ log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem']
# TODO: wrap all these in a static global class: ``DebugLock`` maybe?
# placeholder for function to set a ``trio.Event`` on debugger exit
_pdb_release_hook: Optional[Callable] = None
# actor-wide variable pointing to current task name using debugger
_in_debug = False
_local_task_in_debug: Optional[str] = None
# actor tree-wide actor uid that supposedly has the tty lock
_global_actor_in_debug: Optional[Tuple[str, str]] = None
# lock in root actor preventing multi-access to local tty
_debug_lock = trio.StrictFIFOLock()
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_pdb_complete: Optional[trio.Event] = None
# XXX: set by the current task waiting on the root tty lock
# and must be cancelled if this actor is cancelled via message
@ -61,19 +68,19 @@ class PdbwTeardown(pdbpp.Pdb):
# TODO: figure out how to dissallow recursive .set_trace() entry
# since that'll cause deadlock for us.
def set_continue(self):
global _in_debug
try:
super().set_continue()
finally:
_in_debug = False
global _local_task_in_debug
_local_task_in_debug = None
_pdb_release_hook()
def set_quit(self):
global _in_debug
try:
super().set_quit()
finally:
_in_debug = False
global _local_task_in_debug
_local_task_in_debug = None
_pdb_release_hook()
@ -119,17 +126,21 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
"""Acquire a actor local FIFO lock meant to mutex entry to a local
debugger entry point to avoid tty clobbering by multiple processes.
"""
global _debug_lock, _global_actor_in_debug
task_name = trio.lowlevel.current_task().name
try:
log.debug(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
await _debug_lock.acquire()
async with _debug_lock:
# _debug_lock._uid = uid
_global_actor_in_debug = uid
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
yield
finally:
_debug_lock.release()
_global_actor_in_debug = None
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
@ -144,78 +155,100 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
# signal.signal(signal.SIGINT, prior_handler)
@tractor.context
async def _hijack_stdin_relay_to_child(
subactor_uid: Tuple[str, str]
) -> AsyncIterator[str]:
# TODO: when we get to true remote debugging
# this will deliver stdin data
log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
async with _acquire_debug_lock(subactor_uid):
log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# with _disable_sigint():
ctx: tractor.Context,
subactor_uid: Tuple[str, str]
) -> None:
global _pdb_complete
task_name = trio.lowlevel.current_task().name
# TODO: when we get to true remote debugging
# this will deliver stdin data?
log.debug(
"Attempting to acquire TTY lock, "
f"remote task: {task_name}:{subactor_uid}"
)
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
async with _acquire_debug_lock(subactor_uid):
with trio.CancelScope(shield=True):
# indicate to child that we've locked stdio
yield 'Locked'
await ctx.started('Locked')
log.runtime( # type: ignore
f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# wait for cancellation of stream by child
# indicating debugger is dis-engaged
await trio.sleep_forever()
# wait for unlock pdb by child
async with ctx.open_stream() as stream:
assert await stream.receive() == 'Unlock'
log.debug(
f"TTY lock released, remote task: {task_name}:{subactor_uid}")
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
# XXX: We only make this sync in case someone wants to
# overload the ``breakpoint()`` built-in.
def _breakpoint(debug_func) -> Awaitable[None]:
async def _breakpoint(debug_func) -> None:
"""``tractor`` breakpoint entry for engaging pdb machinery
in subactors.
"""
actor = tractor.current_actor()
do_unlock = trio.Event()
task_name = trio.lowlevel.current_task().name
global _pdb_complete, _pdb_release_hook
global _local_task_in_debug, _global_actor_in_debug
async def wait_for_parent_stdin_hijack(
task_status=trio.TASK_STATUS_IGNORED
):
global _debugger_request_cs
with trio.CancelScope() as cs:
_debugger_request_cs = cs
try:
async with get_root() as portal:
async with portal.open_stream_from(
# this syncs to child's ``Context.started()`` call.
async with portal.open_context(
tractor._debug._hijack_stdin_relay_to_child,
subactor_uid=actor.uid,
) as stream:
# block until first yield above
async for val in stream:
) as (ctx, val):
assert val == 'Locked'
async with ctx.open_stream() as stream:
# unblock local caller
task_status.started()
# with trio.CancelScope(shield=True):
await do_unlock.wait()
await _pdb_complete.wait()
await stream.send('Unlock')
# trigger cancellation of remote stream
break
finally:
log.debug(f"Exiting debugger for actor {actor}")
global _in_debug
_in_debug = False
global _local_task_in_debug
_local_task_in_debug = None
log.debug(f"Child {actor} released parent stdio lock")
async def _bp():
"""Async breakpoint which schedules a parent stdio lock, and once complete
enters the ``pdbpp`` debugging console.
"""
task_name = trio.lowlevel.current_task().name
global _in_debug
if not _pdb_complete or _pdb_complete.is_set():
_pdb_complete = trio.Event()
# TODO: need a more robust check for the "root" actor
if actor._parent_chan and not is_root_process():
if _in_debug:
if _in_debug == task_name:
if _local_task_in_debug:
if _local_task_in_debug == task_name:
# this task already has the lock and is
# likely recurrently entering a breakpoint
return
@ -223,18 +256,17 @@ def _breakpoint(debug_func) -> Awaitable[None]:
# if **this** actor is already in debug mode block here
# waiting for the control to be released - this allows
# support for recursive entries to `tractor.breakpoint()`
log.warning(
f"Actor {actor.uid} already has a debug lock, waiting...")
await do_unlock.wait()
await trio.sleep(0.1)
log.warning(f"{actor.uid} already has a debug lock, waiting...")
# assign unlock callback for debugger teardown hooks
global _pdb_release_hook
_pdb_release_hook = do_unlock.set
await _pdb_complete.wait()
await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
_in_debug = task_name
_local_task_in_debug = task_name
# assign unlock callback for debugger teardown hooks
_pdb_release_hook = _pdb_complete.set
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
@ -242,21 +274,42 @@ def _breakpoint(debug_func) -> Awaitable[None]:
await actor._service_n.start(wait_for_parent_stdin_hijack)
elif is_root_process():
# we also wait in the root-parent for any child that
# may have the tty locked prior
if _debug_lock.locked(): # root process already has it; ignore
global _debug_lock
# TODO: wait, what about multiple root tasks acquiring
# it though.. shrug?
# root process (us) already has it; ignore
if _global_actor_in_debug == actor.uid:
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
await _debug_lock.acquire()
_pdb_release_hook = _debug_lock.release
_global_actor_in_debug = actor.uid
_local_task_in_debug = task_name
# the lock must be released on pdb completion
def teardown():
global _pdb_complete, _debug_lock
global _global_actor_in_debug, _local_task_in_debug
_debug_lock.release()
_global_actor_in_debug = None
_local_task_in_debug = None
_pdb_complete.set()
_pdb_release_hook = teardown
# block here one (at the appropriate frame *up* where
# ``breakpoint()`` was awaited and begin handling stdio
log.debug("Entering the synchronous world of pdb")
debug_func(actor)
# user code **must** await this!
return _bp()
def _mk_pdb():
# XXX: setting these flags on the pdb instance are absolutely
@ -276,7 +329,7 @@ def _set_trace(actor=None):
pdb = _mk_pdb()
if actor is not None:
log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n")
log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") # type: ignore
pdb.set_trace(
# start 2 levels up in user code
@ -285,8 +338,8 @@ def _set_trace(actor=None):
else:
# we entered the global ``breakpoint()`` built-in from sync code
global _in_debug, _pdb_release_hook
_in_debug = 'sync'
global _local_task_in_debug, _pdb_release_hook
_local_task_in_debug = 'sync'
def nuttin():
pass

View File

@ -3,23 +3,24 @@ Sub-process entry points.
"""
from functools import partial
from typing import Tuple, Any
import signal
import trio # type: ignore
from .log import get_console_log, get_logger
from . import _state
from .to_asyncio import run_as_asyncio_guest
log = get_logger(__name__)
def _mp_main(
actor: 'Actor', # type: ignore
actor: 'Actor', # type: ignore # noqa
accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run``
"""
@ -45,6 +46,10 @@ def _mp_main(
parent_addr=parent_addr
)
try:
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
pass # handle it the same way trio does?
@ -54,15 +59,15 @@ def _mp_main(
def _trio_main(
actor: 'Actor', # type: ignore
actor: 'Actor', # type: ignore # noqa
*,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
"""
# Disable sigint handling in children;
# we don't need it thanks to our cancellation machinery.
signal.signal(signal.SIGINT, signal.SIG_IGN)
log.info(f"Started new trio process for {actor.uid}")
log.info(f"Started new trio process for {actor.uid}")
@ -83,6 +88,10 @@ def _trio_main(
)
try:
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI")

View File

@ -312,11 +312,20 @@ class Portal:
ctx = Context(self.channel, cid, _portal=self)
try:
async with ReceiveMsgStream(ctx, recv_chan, self) as rchan:
# deliver receive only stream
async with ReceiveMsgStream(ctx, recv_chan) as rchan:
self._streams.add(rchan)
yield rchan
finally:
# cancel the far end task on consumer close
# NOTE: this is a special case since we assume that if using
# this ``.open_fream_from()`` api, the stream is one a one
# time use and we couple the far end tasks's lifetime to
# the consumer's scope; we don't ever send a `'stop'`
# message right now since there shouldn't be a reason to
# stop and restart the stream, right?
try:
await ctx.cancel()
except trio.ClosedResourceError:
@ -326,17 +335,64 @@ class Portal:
self._streams.remove(rchan)
# @asynccontextmanager
# async def open_context(
# self,
# func: Callable,
# **kwargs,
# ) -> Context:
# # TODO
# elif resptype == 'context': # context manager style setup/teardown
# # TODO likely not here though
# raise NotImplementedError
@asynccontextmanager
async def open_context(
self,
func: Callable,
**kwargs,
) -> AsyncGenerator[Tuple[Context, Any], None]:
"""Open an inter-actor task context.
This is a synchronous API which allows for deterministic
setup/teardown of a remote task. The yielded ``Context`` further
allows for opening bidirectional streams - see
``Context.open_stream()``.
"""
# conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False)
):
raise TypeError(
f'{func} must be an async generator function!')
fn_mod_path, fn_name = func_deats(func)
recv_chan: trio.ReceiveMemoryChannel = None
try:
cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs)
assert functype == 'context'
msg = await recv_chan.receive()
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first = msg['started']
except KeyError:
assert msg.get('cid'), ("Received internal error at context?")
if msg.get('error'):
# raise the error message
raise unpack_error(msg, self.channel)
else:
raise
# deliver context instance and .started() msg value in open
# tuple.
ctx = Context(self.channel, cid, _portal=self)
try:
yield ctx, first
finally:
await ctx.cancel()
finally:
if recv_chan is not None:
await recv_chan.aclose()
@dataclass
class LocalPortal:

View File

@ -174,7 +174,6 @@ async def open_root_actor(
finally:
logger.info("Shutting down root actor")
with trio.CancelScope(shield=True):
await actor.cancel()
finally:
_state._current_actor = None
@ -227,7 +226,7 @@ def run(
def run_daemon(
rpc_module_paths: List[str],
enable_modules: List[str],
**kwargs
) -> None:
"""Spawn daemon actor which will respond to RPC.
@ -236,9 +235,9 @@ def run_daemon(
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests.
"""
kwargs['rpc_module_paths'] = list(rpc_module_paths)
kwargs['enable_modules'] = list(enable_modules)
for path in rpc_module_paths:
for path in enable_modules:
importlib.import_module(path)
return run(partial(trio.sleep, float('inf')), **kwargs)

View File

@ -22,7 +22,13 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override
from ._state import current_actor, is_main_process
from ._state import (
current_actor,
is_main_process,
is_root_process,
_runtime_vars,
)
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
@ -153,6 +159,7 @@ async def cancel_on_completion(
async def spawn_subactor(
subactor: 'Actor',
parent_addr: Tuple[str, int],
infect_asyncio: bool,
):
spawn_cmd = [
sys.executable,
@ -177,26 +184,52 @@ async def spawn_subactor(
subactor.loglevel
]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
proc = await trio.open_process(spawn_cmd)
try:
yield proc
finally:
log.debug(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early
# since trio does this internally on ``__aexit__()``
# NOTE: we always "shield" join sub procs in
# the outer scope since no actor zombies are
# ever allowed. This ``__aexit__()`` also shields
# internally.
log.debug(f"Attempting to kill {proc}")
if (
is_root_process()
# NOTE: this timeout effectively does nothing right now since
# we are shielding the ``.wait()`` inside ``new_proc()`` which
# will pretty much never release until the process exits.
# XXX: basically the pre-closing of stdstreams in a
# root-processe's ``trio.Process.aclose()`` can clobber
# any existing debugger session so we avoid
and _runtime_vars['_debug_mode']
):
# XXX: this is ``trio.Process.aclose()`` minus
# the std-streams pre-closing steps and ``Process.kill()``
# calls.
try:
await proc.wait()
finally:
if proc.returncode is None:
# XXX: skip this when in debug and a session might
# still be live
# proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
else:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
with trio.move_on_after(3) as cs:
async with proc:
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
log.critical(f"HARD KILLING {proc}")
proc.kill()
@ -212,7 +245,7 @@ async def new_proc(
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
@ -223,11 +256,12 @@ async def new_proc(
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
if use_trio_run_in_process or _spawn_method == 'trio':
if _spawn_method == 'trio':
async with trio.open_nursery() as nursery:
async with spawn_subactor(
subactor,
parent_addr,
infect_asyncio=infect_asyncio
) as proc:
log.info(f"Started {proc}")
@ -305,6 +339,7 @@ async def new_proc(
bind_addr=bind_addr,
parent_addr=parent_addr,
_runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status,
)
@ -320,7 +355,7 @@ async def mp_new_proc(
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
@ -366,6 +401,7 @@ async def mp_new_proc(
fs_info,
start_method,
parent_addr,
infect_asyncio,
),
# daemon=True,
name=name,

View File

@ -1,19 +1,211 @@
"""
Message stream types and APIs.
"""
import inspect
from contextlib import contextmanager # , asynccontextmanager
from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass
from typing import Any, Iterator, Optional
from typing import (
Any, Iterator, Optional, Callable,
AsyncGenerator,
)
import warnings
import trio
from ._ipc import Channel
from ._exceptions import unpack_error
from ._state import current_actor
from .log import get_logger
log = get_logger(__name__)
# TODO: generic typing like trio's receive channel
# but with msgspec messages?
# class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
class ReceiveMsgStream(trio.abc.ReceiveChannel):
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
special behaviour for signalling stream termination across an
inter-actor ``Channel``. This is the type returned to a local task
which invoked a remote streaming function using `Portal.run()`.
Termination rules:
- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise a
``StopAsyncIteration`` to terminate the local ``async for``
"""
def __init__(
self,
ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.abc.ReceiveChannel,
shield: bool = False,
) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._shielded = shield
# delegate directly to underlying mem channel
def receive_nowait(self):
msg = self._rx_chan.receive_nowait()
return msg['yield']
async def receive(self):
try:
msg = await self._rx_chan.receive()
return msg['yield']
except KeyError:
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10 match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
if msg.get('stop'):
log.debug(f"{self} was stopped at remote end")
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
raise trio.EndOfChannel
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, self._ctx.chan)
else:
raise
except (trio.ClosedResourceError, StopAsyncIteration):
# XXX: this indicates that a `stop` message was
# sent by the far side of the underlying channel.
# Currently this is triggered by calling ``.aclose()`` on
# the send side of the channel inside
# ``Actor._push_result()``, but maybe it should be put here?
# to avoid exposing the internal mem chan closing mechanism?
# in theory we could instead do some flushing of the channel
# if needed to ensure all consumers are complete before
# triggering closure too early?
# Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically.
# We **don't** want to be closing this send channel and not
# relaying a final value to remaining consumers who may not
# have been scheduled to receive it yet?
# lots of testing to do here
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
# await self._ctx.send_stop()
raise StopAsyncIteration
except trio.Cancelled:
# relay cancels to the remote task
await self.aclose()
raise
@contextmanager
def shield(
self
) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
Note that here, "shielding" here guards against relaying
a ``'stop'`` message to the far end of the stream thus keeping
the stream machinery active and ready for further use, it does
not have anything to do with an internal ``trio.CancelScope``.
"""
self._shielded = True
yield self
self._shielded = False
async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
"""
# TODO: proper adherance to trio's `.aclose()` semantics:
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan
if rx_chan._closed:
log.warning(f"{self} is already closed")
return
# TODO: broadcasting to multiple consumers
# stats = rx_chan.statistics()
# if stats.open_receive_channels > 1:
# # if we've been cloned don't kill the stream
# log.debug(
# "there are still consumers running keeping stream alive")
# return
if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return
# NOTE: this is super subtle IPC messaging stuff:
# Relay stop iteration to far end **iff** we're
# in bidirectional mode. If we're only streaming
# *from* one side then that side **won't** have an
# entry in `Actor._cids2qs` (maybe it should though?).
# So any `yield` or `stop` msgs sent from the caller side
# will cause key errors on the callee side since there is
# no entry for a local feeder mem chan since the callee task
# isn't expecting messages to be sent by the caller.
# Thus, we must check that this context DOES NOT
# have a portal reference to ensure this is indeed the callee
# side and can relay a 'stop'. In the bidirectional case,
# `Context.open_stream()` will create the `Actor._cids2qs`
# entry from a call to `Actor.get_memchans()`.
if not self._ctx._portal:
# only for 2 way streams can we can send
# stop from the caller side
await self._ctx.send_stop()
# close the local mem chan
await rx_chan.aclose()
# TODO: but make it broadcasting to consumers
# def clone(self):
# """Clone this receive channel allowing for multi-task
# consumption from the same channel.
# """
# return ReceiveStream(
# self._cid,
# self._rx_chan.clone(),
# self._portal,
# )
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
"""
Bidirectional message stream for use within an inter-actor actor
``Context```.
"""
async def send(
self,
data: Any
) -> None:
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
@dataclass(frozen=True)
class Context:
"""An IAC (inter-actor communication) context.
@ -31,6 +223,10 @@ class Context:
chan: Channel
cid: str
# TODO: should we have seperate types for caller vs. callee
# side contexts? The caller always opens a portal whereas the callee
# is always responding back through a context-stream
# only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa
@ -57,8 +253,11 @@ class Context:
timeout quickly to sidestep 2-generals...
"""
assert self._portal, (
"No portal found, this is likely a callee side context")
if self._portal: # caller side:
if not self._portal:
raise RuntimeError(
"No portal found, this is likely a callee side context"
)
cid = self.cid
with trio.move_on_after(0.5) as cs:
@ -76,27 +275,88 @@ class Context:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
if not self._portal.channel.connected():
# if not self._portal.channel.connected():
if not self.chan.connected():
log.warning(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
else:
# ensure callee side
assert self._cancel_scope
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36
self._cancel_scope.cancel()
# TODO: do we need a restart api?
# async def restart(self) -> None:
# # TODO
# pass
# @asynccontextmanager
# async def open_stream(
# self,
# ) -> AsyncContextManager:
# # TODO
# pass
@asynccontextmanager
async def open_stream(
self,
shield: bool = False,
) -> AsyncGenerator[MsgStream, None]:
# TODO
actor = current_actor()
# here we create a mem chan that corresponds to the
# far end caller / callee.
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.send_cmd()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
_, recv_chan = actor.get_memchans(
self.chan.uid,
self.cid
)
async with MsgStream(
ctx=self,
rx_chan=recv_chan,
shield=shield,
) as rchan:
if self._portal:
self._portal._streams.add(rchan)
try:
yield rchan
except trio.EndOfChannel:
raise
else:
# signal ``StopAsyncIteration`` on far end.
await self.send_stop()
finally:
if self._portal:
self._portal._streams.remove(rchan)
async def started(self, value: Optional[Any] = None) -> None:
if self._portal:
raise RuntimeError(
f"Caller side context {self} can not call started!")
await self.chan.send({'started': value, 'cid': self.cid})
def stream(func):
def stream(func: Callable) -> Callable:
"""Mark an async function as a streaming routine with ``@stream``.
"""
func._tractor_stream_function = True
# annotate
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_stream_function = True # type: ignore
sig = inspect.signature(func)
params = sig.parameters
if 'stream' not in params and 'ctx' in params:
@ -114,147 +374,26 @@ def stream(func):
):
raise TypeError(
"The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`"
f"{func.__name__} must be `ctx: tractor.Context` "
"(Or ``to_trio`` if using ``asyncio`` in guest mode)."
)
return func
class ReceiveMsgStream(trio.abc.ReceiveChannel):
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
special behaviour for signalling stream termination across an
inter-actor ``Channel``. This is the type returned to a local task
which invoked a remote streaming function using `Portal.run()`.
Termination rules:
- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise a
``StopAsyncIteration`` to terminate the local ``async for``
def context(func: Callable) -> Callable:
"""Mark an async function as a streaming routine with ``@context``.
"""
def __init__(
self,
ctx: Context,
rx_chan: trio.abc.ReceiveChannel,
portal: 'Portal', # type: ignore # noqa
) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._portal = portal
self._shielded = False
# annotate
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore
# delegate directly to underlying mem channel
def receive_nowait(self):
return self._rx_chan.receive_nowait()
async def receive(self):
try:
msg = await self._rx_chan.receive()
return msg['yield']
except KeyError:
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10 match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
# TODO: test that shows stream raising an expected error!!!
if msg.get('error'):
# raise the error message
raise unpack_error(msg, self._portal.channel)
except (trio.ClosedResourceError, StopAsyncIteration):
# XXX: this indicates that a `stop` message was
# sent by the far side of the underlying channel.
# Currently this is triggered by calling ``.aclose()`` on
# the send side of the channel inside
# ``Actor._push_result()``, but maybe it should be put here?
# to avoid exposing the internal mem chan closing mechanism?
# in theory we could instead do some flushing of the channel
# if needed to ensure all consumers are complete before
# triggering closure too early?
# Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically.
# We **don't** want to be closing this send channel and not
# relaying a final value to remaining consumers who may not
# have been scheduled to receive it yet?
# lots of testing to do here
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
raise StopAsyncIteration
except trio.Cancelled:
# relay cancels to the remote task
await self.aclose()
raise
@contextmanager
def shield(
self
) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
"""
self._shielded = True
yield self
self._shielded = False
async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
"""
rx_chan = self._rx_chan
if rx_chan._closed:
log.warning(f"{self} is already closed")
return
# stats = rx_chan.statistics()
# if stats.open_receive_channels > 1:
# # if we've been cloned don't kill the stream
# log.debug(
# "there are still consumers running keeping stream alive")
# return
if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return
# close the local mem chan
rx_chan.close()
# cancel surrounding IPC context
await self._ctx.cancel()
# TODO: but make it broadcasting to consumers
# def clone(self):
# """Clone this receive channel allowing for multi-task
# consumption from the same channel.
# """
# return ReceiveStream(
# self._cid,
# self._rx_chan.clone(),
# self._portal,
# )
# class MsgStream(ReceiveMsgStream, trio.abc.Channel):
# """
# Bidirectional message stream for use within an inter-actor actor
# ``Context```.
# """
# async def send(
# self,
# data: Any
# ) -> None:
# await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
sig = inspect.signature(func)
params = sig.parameters
if 'ctx' not in params:
raise TypeError(
"The first argument to the context function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
return func

View File

@ -11,7 +11,8 @@ import warnings
import trio
from async_generator import asynccontextmanager
from ._state import current_actor, is_main_process
from . import _debug
from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._portal import Portal
@ -60,6 +61,8 @@ class ActorNursery:
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
infect_asyncio: bool = False,
debug_mode: Optional[bool] = None,
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()
@ -67,6 +70,10 @@ class ActorNursery:
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
# allow setting debug policy per actor
if debug_mode is not None:
_rtv['_debug_mode'] = debug_mode
enable_modules = enable_modules or []
if rpc_module_paths:
@ -103,6 +110,7 @@ class ActorNursery:
bind_addr,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
)
)
@ -115,6 +123,7 @@ class ActorNursery:
rpc_module_paths: Optional[List[str]] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn``
) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and
@ -139,6 +148,7 @@ class ActorNursery:
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,
infect_asyncio=infect_asyncio,
)
# XXX: don't allow stream funcs
@ -253,6 +263,26 @@ async def _open_and_supervise_one_cancels_all_nursery(
"to complete"
)
except BaseException as err:
if is_root_process() and (
type(err) in {
Exception, trio.MultiError, trio.Cancelled
}
):
# if we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty streams.
# instead try to wait for pdb to be released before
# tearing down.
debug_complete = _debug._pdb_complete
if debug_complete and not debug_complete.is_set():
log.warning(
"Root has errored but pdb is active..waiting "
"on debug lock")
await _debug._pdb_complete.wait()
# raise
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
@ -357,7 +387,8 @@ async def open_nursery(
try:
if actor is None and is_main_process():
# if we are the parent process start the actor runtime implicitly
# if we are the parent process start the
# actor runtime implicitly
log.info("Starting actor runtime!")
# mark us for teardown on exit
@ -366,6 +397,7 @@ async def open_nursery(
async with open_root_actor(**kwargs) as actor:
assert actor is current_actor()
# try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
@ -376,7 +408,6 @@ async def open_nursery(
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
finally:

View File

@ -101,6 +101,7 @@ def pub(
wrapped: typing.Callable = None,
*,
tasks: Set[str] = set(),
send_on_connect: Any = None,
):
"""Publisher async generator decorator.
@ -186,7 +187,7 @@ def pub(
# handle the decorator not called with () case
if wrapped is None:
return partial(pub, tasks=tasks)
return partial(pub, tasks=tasks, send_on_connect=send_on_connect)
task2lock: Dict[str, trio.StrictFIFOLock] = {}
@ -229,6 +230,11 @@ def pub(
try:
modify_subs(topics2ctxs, topics, ctx)
# if specified send the startup message back to consumer
if send_on_connect is not None:
await ctx.send_yield(send_on_connect)
# block and let existing feed task deliver
# stream data until it is cancelled in which case
# the next waiting task will take over and spawn it again

View File

@ -0,0 +1,311 @@
"""
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
"""
import asyncio
import inspect
from typing import (
Any,
Callable,
AsyncIterator,
Awaitable,
)
import trio
from .log import get_logger
from ._state import current_actor
log = get_logger(__name__)
__all__ = ['run_task', 'run_as_asyncio_guest']
async def run_coro(
to_trio: trio.MemorySendChannel,
coro: Awaitable,
) -> None:
"""Await ``coro`` and relay result back to ``trio``.
"""
to_trio.send_nowait(await coro)
async def consume_asyncgen(
to_trio: trio.MemorySendChannel,
coro: AsyncIterator,
) -> None:
"""Stream async generator results back to ``trio``.
``from_trio`` might eventually be used here for
bidirectional streaming.
"""
async for item in coro:
to_trio.send_nowait(item)
def _run_asyncio_task(
func: Callable,
*,
qsize: int = 1,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
assert current_actor().is_infected_aio()
# ITC (inter task comms)
from_trio = asyncio.Queue(qsize) # type: ignore
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
from_aio._err = None
args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return
# value otherwise it would just return ;P
# _treat_as_stream = True
assert qsize > 1
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
# if 'from_aio' in args:
# kwargs['from_aio'] = from_aio
coro = func(**kwargs)
# cancel_scope = trio.CancelScope()
# start the asyncio task we submitted from trio
if inspect.isawaitable(coro):
task = asyncio.create_task(run_coro(to_trio, coro))
elif inspect.isasyncgen(coro):
task = asyncio.create_task(consume_asyncgen(to_trio, coro))
else:
raise TypeError(f"No support for invoking {coro}")
aio_err = None
def cancel_trio(task):
"""Cancel the calling ``trio`` task on error.
"""
nonlocal aio_err
try:
aio_err = task.exception()
except asyncio.CancelledError as cerr:
aio_err = cerr
if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel()
from_aio._err = aio_err
task.add_done_callback(cancel_trio)
return task, from_aio, to_trio
async def run_task(
func: Callable,
*,
qsize: int = 2**10,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
# assert current_actor().is_infected_aio()
# # ITC (inter task comms)
# from_trio = asyncio.Queue(qsize) # type: ignore
# to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
# args = tuple(inspect.getfullargspec(func).args)
# if getattr(func, '_tractor_steam_function', None):
# # the assumption is that the target async routine accepts the
# # send channel then it intends to yield more then one return
# # value otherwise it would just return ;P
# _treat_as_stream = True
# # allow target func to accept/stream results manually by name
# if 'to_trio' in args:
# kwargs['to_trio'] = to_trio
# if 'from_trio' in args:
# kwargs['from_trio'] = from_trio
# coro = func(**kwargs)
# cancel_scope = trio.CancelScope()
# # start the asyncio task we submitted from trio
# if inspect.isawaitable(coro):
# task = asyncio.create_task(run_coro(to_trio, coro))
# elif inspect.isasyncgen(coro):
# task = asyncio.create_task(consume_asyncgen(to_trio, coro))
# else:
# raise TypeError(f"No support for invoking {coro}")
# aio_err = None
# def cancel_trio(task):
# """Cancel the calling ``trio`` task on error.
# """
# nonlocal aio_err
# aio_err = task.exception()
# if aio_err:
# log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel()
# task.add_done_callback(cancel_trio)
# async iterator
# if inspect.isasyncgen(coro) or _treat_as_stream:
# if inspect.isasyncgenfunction(meth) or :
if _treat_as_stream:
task, from_aio, to_trio = _run_asyncio_task(
func,
qsize=2**8,
**kwargs,
)
return from_aio
# async def stream_results():
# try:
# with cancel_scope:
# # stream values upward
# async with from_aio:
# async for item in from_aio:
# yield item
# if cancel_scope.cancelled_caught:
# # always raise from any captured asyncio error
# if aio_err:
# raise aio_err
# except BaseException as err:
# if aio_err is not None:
# # always raise from any captured asyncio error
# raise err from aio_err
# else:
# raise
# finally:
# # breakpoint()
# task.cancel()
# return stream_results()
# simple async func
try:
task, from_aio, to_trio = _run_asyncio_task(
func,
qsize=1,
**kwargs,
)
# with cancel_scope:
# async with from_aio:
# return single value
return await from_aio.receive()
# if cancel_scope.cancelled_caught:
# # always raise from any captured asyncio error
# if aio_err:
# raise aio_err
# Do we need this?
except Exception as err:
# await tractor.breakpoint()
aio_err = from_aio._err
# try:
if aio_err is not None:
# always raise from any captured asyncio error
raise err from aio_err
else:
raise
# finally:
# if not task.done():
# task.cancel()
except trio.Cancelled:
if not task.done():
task.cancel()
raise
# async def stream_from_task
# pass
def run_as_asyncio_guest(
trio_main: Callable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
log.info(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
# might as well if it's installed.
try:
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
except ImportError:
pass
asyncio.run(aio_main(trio_main))