forked from goodboy/tractor
1
0
Fork 0

Compare commits

...

55 Commits

Author SHA1 Message Date
Tyler Goodlet 57edf481e8 Always raise asyncio errors 2021-09-08 07:46:55 -04:00
Tyler Goodlet d6ddc47e58 Test non-shielding root lock acquire on breakpoint entry 2021-09-08 07:46:55 -04:00
Tyler Goodlet 86f4f2df6f Drop old implementation cruft 2021-09-08 07:46:55 -04:00
Tyler Goodlet 2bd5ba76b9 Fix error propagation on asyncio streaming tasks 2021-09-08 07:46:55 -04:00
Tyler Goodlet a4859c969c Drop bad .close() call 2021-09-08 07:46:55 -04:00
Tyler Goodlet 2dfa12c743 Proxy asyncio cancelleds as well 2021-09-08 07:46:55 -04:00
Tyler Goodlet f812c344a7 Don't kill root's immediate children when in debug
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
2021-09-08 07:46:55 -04:00
Tyler Goodlet e161f7bac0 WIP redo asyncio async gen streaming 2021-09-08 07:46:55 -04:00
Tyler Goodlet 3fd28ee3a5 Support asyncio actors with the trio spawner backend 2021-09-08 07:46:55 -04:00
Tyler Goodlet 8dba692ef5 Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-09-08 07:46:55 -04:00
Tyler Goodlet 673aeef4e9 Support asyncio actors with the trio spawner backend 2021-09-08 07:46:55 -04:00
Tyler Goodlet 9e6f75a592 Link to SC on wikipedia 2021-09-08 07:46:55 -04:00
Tyler Goodlet 76f9ff608c Add per actor debug mode toggle 2021-09-08 07:46:55 -04:00
Tyler Goodlet cbdf23ee6b Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-09-08 07:46:55 -04:00
Tyler Goodlet cb43c4c428 Pass func refs 2021-09-08 07:46:55 -04:00
Tyler Goodlet f154f492fc Add initial infected asyncio error propagation test 2021-09-08 07:46:55 -04:00
Tyler Goodlet 0802736095 Raise any asyncio errors if in trio task on cancel 2021-09-08 07:46:55 -04:00
Tyler Goodlet 68e5c2a95f Raise from asyncio error; fixes mypy 2021-09-08 07:46:55 -04:00
Tyler Goodlet 2adb59f40f Tweak log msg 2021-09-08 07:46:55 -04:00
Tyler Goodlet 0bac1f3021 Log error 2021-09-08 07:46:55 -04:00
Tyler Goodlet 25c19b9274 Support asyncio actors with the trio spawner backend 2021-09-08 07:46:55 -04:00
Tyler Goodlet 86089800ab Revert removal of `infect_asyncio` in nursery start methods 2021-09-08 07:46:55 -04:00
Tyler Goodlet 92594d8222 Attempt to make mypy happy.. 2021-09-08 07:46:55 -04:00
Tyler Goodlet 45a743cdd4 Add an obnoxious error message on internal failures 2021-09-08 07:46:55 -04:00
Tyler Goodlet a2d119ab56 Wow, fix all the broken async func invoking code..
Clearly this wasn't developed against a task that spawned just an async
func in `asyncio`.. Fix all that and remove a bunch of unnecessary func
layers. Add provisional support for the target receiving the `to_trio`
and `from_trio` channels and for the @tractor.stream marker.
2021-09-08 07:46:55 -04:00
Tyler Goodlet 24a63415ef Drop entrypoints from `Actor` 2021-09-08 07:46:55 -04:00
Tyler Goodlet 9b7a4a1cd5 Move asyncio guest mode entrypoint to `to_asyncio`
The function is useful if you want to run the "main process" under
`asyncio`. Until `trio` core wraps this better we'll keep our own copy
in the interim (there's a new "inside-out-guest" mode almost on
mainline so hang tight).
2021-09-08 07:46:55 -04:00
Tyler Goodlet d55671f68b Propagate any spawned `asyncio` task error upwards
This should mostly maintain top level SC principles for any task spawned
using `tractor.to_asyncio.run()`. When the `asyncio` task completes make
sure to cancel the pertaining `trio` cancel scope and raise any error
that may have resulted.

Resolves #120
2021-09-08 07:46:55 -04:00
Tyler Goodlet 38d4fe31ac Add a @pub kwarg to allow specifying a "startup response message" 2021-09-08 07:46:55 -04:00
Tyler Goodlet 73f814e0d8 Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-09-08 07:46:55 -04:00
Tyler Goodlet e8b282810e Add fragment 2021-09-07 21:37:57 -04:00
Tyler Goodlet 22383d1ed9 Add `msgspec` mentions to readme 2021-09-07 21:27:05 -04:00
Tyler Goodlet 0d41f1410f Add a stream type factory 2021-09-07 21:07:33 -04:00
Tyler Goodlet 6cf4a80fe4 Don't expect list value from registry 2021-09-07 20:25:40 -04:00
Tyler Goodlet c188008844 Ugh, appease mypy yet again 2021-09-07 20:24:02 -04:00
Tyler Goodlet 593fd24a9e Attempt to gracefully handle channel breakage? 2021-09-07 16:59:40 -04:00
Tyler Goodlet bb8452dbdb Ensure tuple for passed in arbiter addr 2021-09-06 12:07:09 -04:00
Tyler Goodlet 82999d10df Pin to latest and greatest `msgspec` 2021-09-06 11:53:50 -04:00
Tyler Goodlet a085111173 Call registry getter method in test 2021-09-06 11:53:50 -04:00
Tyler Goodlet c46bf6b3c4 Cast `defaultdict` to `dict` for registry get 2021-09-06 11:53:50 -04:00
Tyler Goodlet b8b264ae54 Map broken stream errs to transport closed; msgspec seems to be racy 2021-09-05 18:48:25 -04:00
Tyler Goodlet c27b00687c Convert actor UIDs to hashable tuples
`msgspec` sends python lists over the wire
(https://github.com/jcrist/msgspec/issues/30) which is fine and dandy
but we use them as lookup keys so we need to be sure we tuple-cast
first.
2021-09-05 18:27:50 -04:00
Tyler Goodlet fa6d9bef52 Fix log levels 2021-09-05 16:29:16 -04:00
Tyler Goodlet bdde646d4c Add msgspec installs, drop py3.7 2021-09-05 16:26:36 -04:00
Tyler Goodlet 7d0541d864 Mypy fixes to enforce uid tuple 2021-09-05 16:26:36 -04:00
Tyler Goodlet 7888de6070 Fix py version classifier 2021-09-05 16:26:36 -04:00
Tyler Goodlet 3b2598a060 Pkg `msgpec` as optional dep, load transport type if importable 2021-09-05 16:26:34 -04:00
Tyler Goodlet eb44244f24 Accept transport closed error during handshake and msg loop 2021-09-05 16:25:41 -04:00
Tyler Goodlet 7b902b7e9c Drop happy eyeballs inf delay 2021-09-05 16:23:33 -04:00
Tyler Goodlet fdd2da238a Add our own "transport closed" signal
This change some super old (and bad) code from the project's very early
days. For some redic reason i must have thought masking `trio`'s
internal stream / transport errors and a TCP EOF as `StopAsyncIteration`
somehow a good idea. The reality is you probably
want to know the difference between an unexpected transport error
and a simple EOF lol. This begins to resolve that by adding our own
special `TransportClosed` error to signal the "graceful" termination of
a channel's underlying transport. Oh, and this builds on the `msgspec`
integration which helped shed light on the core issues here B)
2021-09-05 16:23:10 -04:00
Tyler Goodlet bc6af2219e Add streaming decode support for `msgspec`
Add a `tractor._ipc.MsgspecStream` type which can be swapped in for
`msgspec` serialization transparently. A small msg-length-prefix framing
is implemented as part of the type and we use
`tricycle.BufferedReceieveStream` to handle buffering logic for the
underlying transport.

Notes:
- had to force cast a few more list  -> tuple spots due to no native
  `tuple`decode-by-default in `msgspec`: https://github.com/jcrist/msgspec/issues/30
- the framing can be understood by this protobuf walkthrough:
  https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers
- `tricycle` becomes a new dependency
2021-09-05 16:18:42 -04:00
Tyler Goodlet 5e03108211 Always cast arbiter addr to tuple 2021-09-05 15:27:08 -04:00
Tyler Goodlet 132b9651dd Add `tricycle` and `msgspec` deps 2021-09-05 15:27:07 -04:00
Tyler Goodlet adc77861bb Try out `msgspec` in our msgpack stream channel
Can only really use an encoder currently since there is no streaming api
in `msgspec` as of currently. See jcrist/msgspec#27.

Not sure if any encoding speedups are currently noticeable especially
without any validation going on yet XD.

First experiments toward #196
2021-09-05 15:26:00 -04:00
Tyler Goodlet 93a83eab1c Cast to tuples for all uids explicitly 2021-09-05 15:22:40 -04:00
17 changed files with 593 additions and 66 deletions

View File

@ -3,6 +3,7 @@ name: CI
on: push on: push
jobs: jobs:
mypy: mypy:
name: 'MyPy' name: 'MyPy'
runs-on: ubuntu-latest runs-on: ubuntu-latest
@ -23,23 +24,59 @@ jobs:
run: mypy tractor/ --ignore-missing-imports run: mypy tractor/ --ignore-missing-imports
testing: testing:
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}' name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
timeout-minutes: 9 timeout-minutes: 9
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, windows-latest] os: [ubuntu-latest, windows-latest]
python: ['3.8', '3.9'] python: ['3.8', '3.9']
spawn_backend: ['trio', 'mp'] spawn_backend: ['trio', 'mp']
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v2
- name: Setup python - name: Setup python
uses: actions/setup-python@v2 uses: actions/setup-python@v2
with: with:
python-version: '${{ matrix.python }}' python-version: '${{ matrix.python }}'
- name: Install dependencies - name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
testing-msgspec:
# runs py3.9 jobs on all OS's but with optional `msgspec` dep installed
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }} - msgspec'
timeout-minutes: 10
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
python: ['3.9']
spawn_backend: ['trio', 'mp']
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '${{ matrix.python }}'
- name: Install dependencies
run: pip install -U .[msgspec] -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs

View File

@ -24,8 +24,9 @@ Features
- Built-in inter-process streaming APIs - Built-in inter-process streaming APIs
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_ - A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
- Support for a swappable, OS specific, process spawning layer - Support for a swappable, OS specific, process spawning layer
- A modular transport stack, allowing for custom serialization, - A modular transport stack, allowing for custom serialization (eg.
communications protocols, and environment specific IPC primitives `msgspec`_), communications protocols, and environment specific IPC
primitives
- `structured concurrency`_ from the ground up - `structured concurrency`_ from the ground up
@ -322,6 +323,12 @@ From PyPi::
pip install tractor pip install tractor
To try out the (optionally) faster `msgspec`_ codec instead of the
default ``msgpack`` lib::
pip install tractor[msgspec]
From git:: From git::
pip install git+git://github.com/goodboy/tractor.git pip install git+git://github.com/goodboy/tractor.git
@ -394,7 +401,8 @@ Help us push toward the future.
- (Soon to land) ``asyncio`` support allowing for "infected" actors where - (Soon to land) ``asyncio`` support allowing for "infected" actors where
`trio` drives the `asyncio` scheduler via the astounding "`guest mode`_" `trio` drives the `asyncio` scheduler via the astounding "`guest mode`_"
- Typed messaging protocols (ex. via ``msgspec``) - Typed messaging protocols (ex. via ``msgspec``, see `#36
<https://github.com/goodboy/tractor/issues/36>`_)
- Erlang-style supervisors via composed context managers - Erlang-style supervisors via composed context managers
@ -415,6 +423,7 @@ channel`_!
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general .. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org .. _matrix channel: https://matrix.to/#/!tractor:matrix.org
@ -423,10 +432,11 @@ channel`_!
.. _messages: https://en.wikipedia.org/wiki/Message_passing .. _messages: https://en.wikipedia.org/wiki/Message_passing
.. _trio docs: https://trio.readthedocs.io/en/latest/ .. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _async generators: https://www.python.org/dev/peps/pep-0525/
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel .. _trio-parallel: https://github.com/richardsheridan/trio-parallel
.. _msgspec: https://jcristharif.com/msgspec/
.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square .. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square

View File

@ -0,0 +1,9 @@
Add optional `msgspec <https://jcristharif.com/msgspec/>`_ support over
TCP streams as an alernative, faster MessagePack codec.
This get's us moving toward typed messaging/IPC protocols. Further,
``msgspec`` structs may be a valid tool to start for formalizing our "SC
dialog un-protocol" messages as described in `#36
<https://github.com/goodboy/tractor/issues/36>`_`.

View File

@ -44,6 +44,10 @@ setup(
'async_generator', 'async_generator',
'trio_typing', 'trio_typing',
# tooling
'tricycle',
'trio_typing',
# tooling # tooling
'colorlog', 'colorlog',
'wrapt', 'wrapt',
@ -53,6 +57,12 @@ setup(
'msgpack', 'msgpack',
], ],
extras_require={
# serialization
'msgspec': ["msgspec >= 0.3.2'; python_version >= '3.9'"],
},
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.8", python_requires=">=3.8",
keywords=[ keywords=[

View File

@ -42,7 +42,7 @@ async def test_reg_then_unreg(arb_addr):
await trio.sleep(0.1) await trio.sleep(0.1)
assert uid not in aportal.actor._registry assert uid not in aportal.actor._registry
sockaddrs = actor._registry[uid] sockaddrs = actor._registry.get(uid)
assert not sockaddrs assert not sockaddrs
@ -136,7 +136,7 @@ async def spawn_and_check_registry(
if actor.is_arbiter: if actor.is_arbiter:
async def get_reg(): async def get_reg():
return actor._registry return await actor.get_registry()
extra = 1 # arbiter is local root actor extra = 1 # arbiter is local root actor
else: else:
@ -187,13 +187,12 @@ async def spawn_and_check_registry(
await cancel(use_signal) await cancel(use_signal)
finally: finally:
with trio.CancelScope(shield=True): await trio.sleep(0.5)
await trio.sleep(0.5)
# all subactors should have de-registered # all subactors should have de-registered
registry = await get_reg() registry = await get_reg()
assert len(registry) == extra assert len(registry) == extra
assert actor.uid in registry assert actor.uid in registry
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
@ -277,7 +276,9 @@ async def close_chans_before_nursery(
# TODO: compact this back as was in last commit once # TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207 # 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(stream_forever) as agen1: async with portal1.open_stream_from(
stream_forever
) as agen1:
async with portal2.open_stream_from( async with portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen2:
@ -293,8 +294,9 @@ async def close_chans_before_nursery(
# reliably triggered by an external SIGINT. # reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel() # tractor.current_actor()._root_nursery.cancel_scope.cancel()
# XXX: THIS IS THE KEY THING that happens # XXX: THIS IS THE KEY THING that
# **before** exiting the actor nursery block # happens **before** exiting the
# actor nursery block
# also kill off channels cuz why not # also kill off channels cuz why not
await agen1.aclose() await agen1.aclose()

View File

@ -0,0 +1,24 @@
import asyncio
import pytest
import tractor
async def sleep_and_err():
await asyncio.sleep(0.1)
assert 0
async def asyncio_actor():
assert tractor.current_actor().is_infected_aio()
await tractor.to_asyncio.run_task(sleep_and_err)
def test_infected_simple_error(arb_addr):
async def main():
async with tractor.open_nursery() as n:
await n.run_in_actor(asyncio_actor, infected_asyncio=True)
with pytest.raises(tractor.RemoteActorError) as excinfo:
tractor.run(main, arbiter_addr=arb_addr)

View File

@ -1,6 +1,7 @@
""" """
Spawning basics Spawning basics
""" """
from typing import Dict, Tuple
import pytest import pytest
import trio import trio
@ -11,7 +12,11 @@ from conftest import tractor_test
data_to_pass_down = {'doggy': 10, 'kitty': 4} data_to_pass_down = {'doggy': 10, 'kitty': 4}
async def spawn(is_arbiter, data, arb_addr): async def spawn(
is_arbiter: bool,
data: Dict,
arb_addr: Tuple[str, int],
):
namespaces = [__name__] namespaces = [__name__]
await trio.sleep(0.1) await trio.sleep(0.1)

View File

@ -280,6 +280,9 @@ class Actor:
_parent_main_data: Dict[str, str] _parent_main_data: Dict[str, str]
_parent_chan_cs: Optional[trio.CancelScope] = None _parent_chan_cs: Optional[trio.CancelScope] = None
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
def __init__( def __init__(
self, self,
name: str, name: str,
@ -317,7 +320,8 @@ class Actor:
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = arbiter_addr
self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -429,7 +433,10 @@ class Actor:
uid = await self._do_handshake(chan) uid = await self._do_handshake(chan)
except ( except (
# we need this for ``msgspec`` for some reason?
# for now, it's been put in the stream backend.
# trio.BrokenResourceError, # trio.BrokenResourceError,
# trio.ClosedResourceError, # trio.ClosedResourceError,
TransportClosed, TransportClosed,
): ):
@ -615,6 +622,7 @@ class Actor:
# ``scope = Nursery.start()`` # ``scope = Nursery.start()``
task_status.started(loop_cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.debug( log.debug(
@ -775,6 +783,7 @@ class Actor:
if self._spawn_method == "trio": if self._spawn_method == "trio":
# Receive runtime state from our parent # Receive runtime state from our parent
parent_data: dict[str, Any]
parent_data = await chan.recv() parent_data = await chan.recv()
log.debug( log.debug(
"Received state from parent:\n" "Received state from parent:\n"
@ -790,7 +799,16 @@ class Actor:
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
for attr, value in parent_data.items(): for attr, value in parent_data.items():
setattr(self, attr, value)
if attr == '_arb_addr':
# XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
value = tuple(value) if value else None
self._arb_addr = value
else:
setattr(self, attr, value)
return chan, accept_addr return chan, accept_addr
@ -1162,6 +1180,7 @@ class Actor:
async def _do_handshake( async def _do_handshake(
self, self,
chan: Channel chan: Channel
) -> Tuple[str, str]: ) -> Tuple[str, str]:
"""Exchange (name, UUIDs) identifiers as the first communication step. """Exchange (name, UUIDs) identifiers as the first communication step.
@ -1169,15 +1188,19 @@ class Actor:
parlance. parlance.
""" """
await chan.send(self.uid) await chan.send(self.uid)
uid: Tuple[str, str] = await chan.recv() value = await chan.recv()
uid: Tuple[str, str] = (str(value[0]), str(value[1]))
if not isinstance(uid, tuple): if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!") raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid chan.uid = str(uid[0]), str(uid[1])
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid return uid
def is_infected_aio(self) -> bool:
return self._infected_aio
class Arbiter(Actor): class Arbiter(Actor):
"""A special actor who knows all the other actors and always has """A special actor who knows all the other actors and always has
@ -1191,8 +1214,13 @@ class Arbiter(Actor):
is_arbiter = True is_arbiter = True
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._registry = defaultdict(list)
self._registry: Dict[
Tuple[str, str],
Tuple[str, int],
] = {}
self._waiters = {} self._waiters = {}
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
async def find_actor(self, name: str) -> Optional[Tuple[str, int]]: async def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
@ -1204,9 +1232,11 @@ class Arbiter(Actor):
async def get_registry( async def get_registry(
self self
) -> Dict[str, Tuple[str, str]]: ) -> Dict[Tuple[str, str], Tuple[str, int]]:
"""Return current name registry. '''Return current name registry.
"""
This method is async to allow for cross-actor invocation.
'''
# NOTE: requires ``strict_map_key=False`` to the msgpack # NOTE: requires ``strict_map_key=False`` to the msgpack
# unpacker since we have tuples as keys (not this makes the # unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos): # arbiter suscetible to hashdos):
@ -1214,13 +1244,14 @@ class Arbiter(Actor):
return self._registry return self._registry
async def wait_for_actor( async def wait_for_actor(
self, name: str self,
name: str,
) -> List[Tuple[str, int]]: ) -> List[Tuple[str, int]]:
"""Wait for a particular actor to register. '''Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently This is a blocking call if no actor by the provided name is currently
registered. registered.
""" '''
sockaddrs = [] sockaddrs = []
for (aname, _), sockaddr in self._registry.items(): for (aname, _), sockaddr in self._registry.items():
@ -1237,10 +1268,13 @@ class Arbiter(Actor):
return sockaddrs return sockaddrs
async def register_actor( async def register_actor(
self, uid: Tuple[str, str], sockaddr: Tuple[str, int] self,
uid: Tuple[str, str],
sockaddr: Tuple[str, int]
) -> None: ) -> None:
name, uuid = uid uid = name, uuid = (str(uid[0]), str(uid[1]))
self._registry[uid] = sockaddr self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
# pop and signal all waiter events # pop and signal all waiter events
events = self._waiters.pop(name, ()) events = self._waiters.pop(name, ())
@ -1249,5 +1283,9 @@ class Arbiter(Actor):
if isinstance(event, trio.Event): if isinstance(event, trio.Event):
event.set() event.set()
async def unregister_actor(self, uid: Tuple[str, str]) -> None: async def unregister_actor(
self,
uid: Tuple[str, str]
) -> None:
uid = (str(uid[0]), str(uid[1]))
self._registry.pop(uid) self._registry.pop(uid)

View File

@ -19,12 +19,15 @@ def parse_ipaddr(arg):
return (str(host), int(port)) return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid) parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str) parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr) parser.add_argument("--parent_addr", type=parse_ipaddr)
parser.add_argument("--asyncio", action='store_true')
args = parser.parse_args() args = parser.parse_args()
subactor = Actor( subactor = Actor(
@ -36,5 +39,6 @@ if __name__ == "__main__":
_trio_main( _trio_main(
subactor, subactor,
parent_addr=args.parent_addr parent_addr=args.parent_addr,
) infect_asyncio=args.asyncio,
)

View File

@ -254,6 +254,7 @@ async def _hijack_stdin_for_child(
# assert await stream.receive() == 'pdb_unlock' # assert await stream.receive() == 'pdb_unlock'
except ( except (
trio.MultiError,
trio.BrokenResourceError, trio.BrokenResourceError,
trio.Cancelled, # by local cancellation trio.Cancelled, # by local cancellation
trio.ClosedResourceError, # by self._rx_chan trio.ClosedResourceError, # by self._rx_chan
@ -343,6 +344,7 @@ async def _breakpoint(
except tractor.ContextCancelled: except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock') log.warning('Root actor cancelled debug lock')
raise
finally: finally:
log.debug(f"Exiting debugger for actor {actor}") log.debug(f"Exiting debugger for actor {actor}")
@ -407,10 +409,14 @@ async def _breakpoint(
'Root actor attempting to shield-acquire active tty lock' 'Root actor attempting to shield-acquire active tty lock'
f' owned by {_global_actor_in_debug}') f' owned by {_global_actor_in_debug}')
with trio.CancelScope(shield=True): stats = _debug_lock.statistics()
# must shield here to avoid hitting a ``Cancelled`` and if stats.owner:
# a child getting stuck bc we clobbered the tty breakpoint()
await _debug_lock.acquire()
# with trio.CancelScope(shield=True):
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
await _debug_lock.acquire()
else: else:
# may be cancelled # may be cancelled

View File

@ -9,6 +9,7 @@ import trio # type: ignore
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from . import _state from . import _state
from .to_asyncio import run_as_asyncio_guest
log = get_logger(__name__) log = get_logger(__name__)
@ -20,6 +21,7 @@ def _mp_main(
forkserver_info: Tuple[Any, Any, Any, Any, Any], forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str, start_method: str,
parent_addr: Tuple[str, int] = None, parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None: ) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run`` """The routine called *after fork* which invokes a fresh ``trio.run``
""" """
@ -45,7 +47,11 @@ def _mp_main(
parent_addr=parent_addr parent_addr=parent_addr
) )
try: try:
trio.run(trio_main) if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
pass # handle it the same way trio does? pass # handle it the same way trio does?
@ -57,6 +63,7 @@ def _trio_main(
actor: 'Actor', # type: ignore actor: 'Actor', # type: ignore
*, *,
parent_addr: Tuple[str, int] = None, parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio_run_in_process` subactor.
""" """
@ -66,6 +73,8 @@ def _trio_main(
log.info(f"Started new trio process for {actor.uid}") log.info(f"Started new trio process for {actor.uid}")
log.info(f"Started new trio process for {actor.uid}")
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")
@ -83,7 +92,11 @@ def _trio_main(
) )
try: try:
trio.run(trio_main) if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI") log.warning(f"Actor {actor.uid} received KBI")

View File

@ -3,10 +3,11 @@ Inter-process comms abstractions
""" """
import platform import platform
import struct
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional, Type
from functools import partial
from tricycle import BufferedReceiveStream
import msgpack import msgpack
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
@ -17,14 +18,7 @@ log = get_logger(__name__)
_is_windows = platform.system() == 'Windows' _is_windows = platform.system() == 'Windows'
log = get_logger(__name__)
# :eyeroll:
try:
import msgpack_numpy
Unpacker = msgpack_numpy.Unpacker
except ImportError:
# just plain ``msgpack`` requires tweaking key settings
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
class MsgpackTCPStream: class MsgpackTCPStream:
@ -40,26 +34,28 @@ class MsgpackTCPStream:
self.stream = stream self.stream = stream
assert self.stream.socket assert self.stream.socket
# should both be IP sockets # should both be IP sockets
lsockname = stream.socket.getsockname() lsockname = stream.socket.getsockname()
assert isinstance(lsockname, tuple) assert isinstance(lsockname, tuple)
self._laddr = lsockname[:2] self._laddr = lsockname[:2]
rsockname = stream.socket.getpeername() rsockname = stream.socket.getpeername()
assert isinstance(rsockname, tuple) assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2] self._raddr = rsockname[:2]
# start and seed first entry to read loop # start first entry to read loop
self._agen = self._iter_packets() self._agen = self._iter_packets()
# self._agen.asend(None) is None
self._send_lock = trio.StrictFIFOLock() self._send_lock = trio.StrictFIFOLock()
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream. """Yield packets from the underlying stream.
""" """
unpacker = Unpacker( unpacker = msgpack.Unpacker(
raw=False, raw=False,
use_list=False, use_list=False,
strict_map_key=False
) )
while True: while True:
try: try:
@ -111,7 +107,8 @@ class MsgpackTCPStream:
async def send(self, data: Any) -> None: async def send(self, data: Any) -> None:
async with self._send_lock: async with self._send_lock:
return await self.stream.send_all( return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True)) msgpack.dumps(data, use_bin_type=True)
)
async def recv(self) -> Any: async def recv(self) -> Any:
return await self._agen.asend(None) return await self._agen.asend(None)
@ -123,12 +120,104 @@ class MsgpackTCPStream:
return self.stream.socket.fileno() != -1 return self.stream.socket.fileno() != -1
class MsgspecTCPStream(MsgpackTCPStream):
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgspec``.
'''
def __init__(
self,
stream: trio.SocketStream,
prefix_size: int = 4,
) -> None:
import msgspec
super().__init__(stream)
self.recv_stream = BufferedReceiveStream(transport_stream=stream)
self.prefix_size = prefix_size
# TODO: struct aware messaging coders
self.encode = msgspec.Encoder().encode
self.decode = msgspec.Decoder().decode # dict[str, Any])
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
'''Yield packets from the underlying stream.
'''
import msgspec # noqa
last_decode_failed: bool = False
while True:
try:
header = await self.recv_stream.receive_exactly(4)
except (
ValueError,
# not sure entirely why we need this but without it we
# seem to be getting racy failures here on
# arbiter/registry name subs..
trio.BrokenResourceError,
):
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
if header == b'':
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
size, = struct.unpack("<I", header)
log.transport(f'received header {size}') # type: ignore
msg_bytes = await self.recv_stream.receive_exactly(size)
log.transport(f"received {msg_bytes}") # type: ignore
try:
assert not last_decode_failed
yield self.decode(msg_bytes)
except (
msgspec.DecodingError,
UnicodeDecodeError,
):
# ignore decoding errors for now and assume they have to
# do with a channel drop - hope that receiving from the
# channel will raise an expected error and bubble up.
log.error('`msgspec` failed to decode!?')
last_decode_failed = True
async def send(self, data: Any) -> None:
async with self._send_lock:
bytes_data: bytes = self.encode(data)
# supposedly the fastest says,
# https://stackoverflow.com/a/54027962
size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data)
def get_serializer_stream_type(
name: str,
) -> Type:
return {
'msgpack': MsgpackTCPStream,
'msgspec': MsgspecTCPStream,
}[name]
class Channel: class Channel:
"""An inter-process channel for communication between (remote) actors. '''An inter-process channel for communication between (remote) actors.
Currently the only supported transport is a ``trio.SocketStream``. Currently the only supported transport is a ``trio.SocketStream``.
"""
'''
def __init__( def __init__(
self, self,
destaddr: Optional[Tuple[str, int]] = None, destaddr: Optional[Tuple[str, int]] = None,
on_reconnect: typing.Callable[..., typing.Awaitable] = None, on_reconnect: typing.Callable[..., typing.Awaitable] = None,
@ -136,17 +225,32 @@ class Channel:
stream: trio.SocketStream = None, # expected to be active stream: trio.SocketStream = None, # expected to be active
) -> None: ) -> None:
self._recon_seq = on_reconnect self._recon_seq = on_reconnect
self._autorecon = auto_reconnect self._autorecon = auto_reconnect
self.msgstream: Optional[MsgpackTCPStream] = MsgpackTCPStream(
# TODO: maybe expose this through the nursery api?
try:
# if installed load the msgspec transport since it's faster
import msgspec # noqa
serializer = 'msgspec'
except ImportError:
serializer = 'msgpack'
self.stream_serializer_type = get_serializer_stream_type(serializer)
self.msgstream = self.stream_serializer_type(
stream) if stream else None stream) if stream else None
if self.msgstream and destaddr: if self.msgstream and destaddr:
raise ValueError( raise ValueError(
f"A stream was provided with local addr {self.laddr}" f"A stream was provided with local addr {self.laddr}"
) )
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: Optional[Tuple[str, str]] = None self.uid: Optional[Tuple[str, str]] = None
# set if far end actor errors internally # set if far end actor errors internally
self._exc: Optional[Exception] = None self._exc: Optional[Exception] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
@ -169,7 +273,6 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, self,
destaddr: Tuple[Any, ...] = None, destaddr: Tuple[Any, ...] = None,
**kwargs **kwargs
@ -186,7 +289,7 @@ class Channel:
*destaddr, *destaddr,
**kwargs **kwargs
) )
self.msgstream = MsgpackTCPStream(stream) self.msgstream = self.stream_serializer_type(stream)
log.transport( log.transport(
f'Opened channel to peer {self.laddr} -> {self.raddr}' f'Opened channel to peer {self.laddr} -> {self.raddr}'

View File

@ -21,8 +21,8 @@ from ._exceptions import is_multi_cancelled
# set at startup and after forks # set at startup and after forks
_default_arbiter_host = '127.0.0.1' _default_arbiter_host: str = '127.0.0.1'
_default_arbiter_port = 1616 _default_arbiter_port: int = 1616
logger = log.get_logger('tractor') logger = log.get_logger('tractor')
@ -32,7 +32,7 @@ logger = log.get_logger('tractor')
async def open_root_actor( async def open_root_actor(
# defaults are above # defaults are above
arbiter_addr: Tuple[str, int] = ( arbiter_addr: Optional[Tuple[str, int]] = (
_default_arbiter_host, _default_arbiter_host,
_default_arbiter_port, _default_arbiter_port,
), ),
@ -97,7 +97,7 @@ async def open_root_actor(
arbiter_addr = (host, port) = arbiter_addr or ( arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host, _default_arbiter_host,
_default_arbiter_port _default_arbiter_port,
) )
loglevel = loglevel or log.get_loglevel() loglevel = loglevel or log.get_loglevel()
@ -238,7 +238,7 @@ def run(
def run_daemon( def run_daemon(
rpc_module_paths: List[str], enable_modules: List[str],
**kwargs **kwargs
) -> None: ) -> None:
"""Spawn daemon actor which will respond to RPC. """Spawn daemon actor which will respond to RPC.
@ -247,9 +247,9 @@ def run_daemon(
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests. is meant to run forever responding to RPC requests.
""" """
kwargs['rpc_module_paths'] = list(rpc_module_paths) kwargs['enable_modules'] = list(enable_modules)
for path in rpc_module_paths: for path in enable_modules:
importlib.import_module(path) importlib.import_module(path)
return run(partial(trio.sleep, float('inf')), **kwargs) return run(partial(trio.sleep, float('inf')), **kwargs)

View File

@ -179,6 +179,7 @@ async def do_hard_kill(
async def spawn_subactor( async def spawn_subactor(
subactor: 'Actor', subactor: 'Actor',
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
infect_asyncio: bool,
): ):
spawn_cmd = [ spawn_cmd = [
sys.executable, sys.executable,
@ -203,6 +204,10 @@ async def spawn_subactor(
subactor.loglevel subactor.loglevel
] ]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
proc = await trio.open_process(spawn_cmd) proc = await trio.open_process(spawn_cmd)
try: try:
yield proc yield proc
@ -227,6 +232,7 @@ async def new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
"""Create a new ``multiprocessing.Process`` using the """Create a new ``multiprocessing.Process`` using the
@ -242,6 +248,7 @@ async def new_proc(
async with spawn_subactor( async with spawn_subactor(
subactor, subactor,
parent_addr, parent_addr,
infect_asyncio=infect_asyncio
) as proc: ) as proc:
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
@ -324,6 +331,7 @@ async def new_proc(
bind_addr=bind_addr, bind_addr=bind_addr,
parent_addr=parent_addr, parent_addr=parent_addr,
_runtime_vars=_runtime_vars, _runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status, task_status=task_status,
) )
@ -339,6 +347,7 @@ async def mp_new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
@ -384,6 +393,7 @@ async def mp_new_proc(
fs_info, fs_info,
start_method, start_method,
parent_addr, parent_addr,
infect_asyncio,
), ),
# daemon=True, # daemon=True,
name=name, name=name,

View File

@ -61,6 +61,8 @@ class ActorNursery:
enable_modules: List[str] = None, enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None, nursery: trio.Nursery = None,
infect_asyncio: bool = False,
debug_mode: Optional[bool] = None,
) -> Portal: ) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
@ -68,6 +70,10 @@ class ActorNursery:
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
# allow setting debug policy per actor
if debug_mode is not None:
_rtv['_debug_mode'] = debug_mode
enable_modules = enable_modules or [] enable_modules = enable_modules or []
if rpc_module_paths: if rpc_module_paths:
@ -104,6 +110,7 @@ class ActorNursery:
bind_addr, bind_addr,
parent_addr, parent_addr,
_rtv, # run time vars _rtv, # run time vars
infect_asyncio=infect_asyncio,
) )
) )
@ -116,6 +123,7 @@ class ActorNursery:
rpc_module_paths: Optional[List[str]] = None, rpc_module_paths: Optional[List[str]] = None,
enable_modules: List[str] = None, enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn`` **kwargs, # explicit args to ``fn``
) -> Portal: ) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and """Spawn a new actor, run a lone task, then terminate the actor and
@ -140,6 +148,7 @@ class ActorNursery:
loglevel=loglevel, loglevel=loglevel,
# use the run_in_actor nursery # use the run_in_actor nursery
nursery=self._ria_nursery, nursery=self._ria_nursery,
infect_asyncio=infect_asyncio,
) )
# XXX: don't allow stream funcs # XXX: don't allow stream funcs

View File

@ -121,6 +121,7 @@ def pub(
wrapped: typing.Callable = None, wrapped: typing.Callable = None,
*, *,
tasks: Set[str] = set(), tasks: Set[str] = set(),
send_on_connect: Any = None,
): ):
"""Publisher async generator decorator. """Publisher async generator decorator.
@ -206,7 +207,7 @@ def pub(
# handle the decorator not called with () case # handle the decorator not called with () case
if wrapped is None: if wrapped is None:
return partial(pub, tasks=tasks) return partial(pub, tasks=tasks, send_on_connect=send_on_connect)
task2lock: Dict[str, trio.StrictFIFOLock] = {} task2lock: Dict[str, trio.StrictFIFOLock] = {}
@ -249,6 +250,11 @@ def pub(
try: try:
modify_subs(topics2ctxs, topics, ctx) modify_subs(topics2ctxs, topics, ctx)
# if specified send the startup message back to consumer
if send_on_connect is not None:
await ctx.send_yield(send_on_connect)
# block and let existing feed task deliver # block and let existing feed task deliver
# stream data until it is cancelled in which case # stream data until it is cancelled in which case
# the next waiting task will take over and spawn it again # the next waiting task will take over and spawn it again

View File

@ -0,0 +1,241 @@
'''
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
'''
import asyncio
import inspect
from typing import (
Any,
Callable,
AsyncIterator,
Awaitable,
)
import trio
from .log import get_logger
from ._state import current_actor
log = get_logger(__name__)
__all__ = ['run_task', 'run_as_asyncio_guest']
async def run_coro(
to_trio: trio.MemorySendChannel,
coro: Awaitable,
) -> None:
"""Await ``coro`` and relay result back to ``trio``.
"""
to_trio.send_nowait(await coro)
async def consume_asyncgen(
to_trio: trio.MemorySendChannel,
coro: AsyncIterator,
) -> None:
"""Stream async generator results back to ``trio``.
``from_trio`` might eventually be used here for
bidirectional streaming.
"""
async for item in coro:
to_trio.send_nowait(item)
def _run_asyncio_task(
func: Callable,
*,
qsize: int = 1,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
assert current_actor().is_infected_aio()
# ITC (inter task comms)
from_trio = asyncio.Queue(qsize) # type: ignore
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
from_aio._err = None
args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return
# value otherwise it would just return ;P
# _treat_as_stream = True
assert qsize > 1
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
# if 'from_aio' in args:
# kwargs['from_aio'] = from_aio
coro = func(**kwargs)
cancel_scope = trio.CancelScope()
# start the asyncio task we submitted from trio
if inspect.isawaitable(coro):
task = asyncio.create_task(run_coro(to_trio, coro))
elif inspect.isasyncgen(coro):
task = asyncio.create_task(consume_asyncgen(to_trio, coro))
else:
raise TypeError(f"No support for invoking {coro}")
aio_err = None
def cancel_trio(task):
"""Cancel the calling ``trio`` task on error.
"""
nonlocal aio_err
try:
aio_err = task.exception()
except asyncio.CancelledError as cerr:
aio_err = cerr
if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}")
cancel_scope.cancel()
from_aio._err = aio_err
from_aio.close()
task.add_done_callback(cancel_trio)
return task, from_aio, to_trio, cancel_scope
async def run_task(
func: Callable,
*,
qsize: int = 2**10,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
# streaming ``asyncio`` task
if _treat_as_stream:
task, from_aio, to_trio, cs = _run_asyncio_task(
func,
qsize=2**8,
**kwargs,
)
# naively expect the mem chan api to do the job
# of handling cross-framework cancellations / errors
return from_aio
# simple async func
try:
task, from_aio, to_trio, cs = _run_asyncio_task(
func,
qsize=1,
**kwargs,
)
# return single value
with cs:
return await from_aio.receive()
except trio.Cancelled:
if not task.done():
task.cancel()
raise
finally:
if from_aio._err:
raise from_aio._err
# TODO: explicit api for the streaming case where
# we pull from the mem chan in an async generator?
# This ends up looking more like our ``Portal.open_stream_from()``
# NB: code below is untested.
# @asynccontextmanager
# async def stream_from_task(
# target: Callable[Any],
# **kwargs,
# ) -> AsyncIterator[Any]:
# from_aoi = await run_task(target, _treat_as_stream=True, **kwargs)
# with cancel_scope:
# # stream values upward
# async with from_aio:
# async for item in from_aio:
# yield item
def run_as_asyncio_guest(
trio_main: Callable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
log.info(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
# might as well if it's installed.
try:
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
except ImportError:
pass
asyncio.run(aio_main(trio_main))