Compare commits
55 Commits
master
...
msgspec_in
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 57edf481e8 | |
Tyler Goodlet | d6ddc47e58 | |
Tyler Goodlet | 86f4f2df6f | |
Tyler Goodlet | 2bd5ba76b9 | |
Tyler Goodlet | a4859c969c | |
Tyler Goodlet | 2dfa12c743 | |
Tyler Goodlet | f812c344a7 | |
Tyler Goodlet | e161f7bac0 | |
Tyler Goodlet | 3fd28ee3a5 | |
Tyler Goodlet | 8dba692ef5 | |
Tyler Goodlet | 673aeef4e9 | |
Tyler Goodlet | 9e6f75a592 | |
Tyler Goodlet | 76f9ff608c | |
Tyler Goodlet | cbdf23ee6b | |
Tyler Goodlet | cb43c4c428 | |
Tyler Goodlet | f154f492fc | |
Tyler Goodlet | 0802736095 | |
Tyler Goodlet | 68e5c2a95f | |
Tyler Goodlet | 2adb59f40f | |
Tyler Goodlet | 0bac1f3021 | |
Tyler Goodlet | 25c19b9274 | |
Tyler Goodlet | 86089800ab | |
Tyler Goodlet | 92594d8222 | |
Tyler Goodlet | 45a743cdd4 | |
Tyler Goodlet | a2d119ab56 | |
Tyler Goodlet | 24a63415ef | |
Tyler Goodlet | 9b7a4a1cd5 | |
Tyler Goodlet | d55671f68b | |
Tyler Goodlet | 38d4fe31ac | |
Tyler Goodlet | 73f814e0d8 | |
Tyler Goodlet | e8b282810e | |
Tyler Goodlet | 22383d1ed9 | |
Tyler Goodlet | 0d41f1410f | |
Tyler Goodlet | 6cf4a80fe4 | |
Tyler Goodlet | c188008844 | |
Tyler Goodlet | 593fd24a9e | |
Tyler Goodlet | bb8452dbdb | |
Tyler Goodlet | 82999d10df | |
Tyler Goodlet | a085111173 | |
Tyler Goodlet | c46bf6b3c4 | |
Tyler Goodlet | b8b264ae54 | |
Tyler Goodlet | c27b00687c | |
Tyler Goodlet | fa6d9bef52 | |
Tyler Goodlet | bdde646d4c | |
Tyler Goodlet | 7d0541d864 | |
Tyler Goodlet | 7888de6070 | |
Tyler Goodlet | 3b2598a060 | |
Tyler Goodlet | eb44244f24 | |
Tyler Goodlet | 7b902b7e9c | |
Tyler Goodlet | fdd2da238a | |
Tyler Goodlet | bc6af2219e | |
Tyler Goodlet | 5e03108211 | |
Tyler Goodlet | 132b9651dd | |
Tyler Goodlet | adc77861bb | |
Tyler Goodlet | 93a83eab1c |
|
@ -3,6 +3,7 @@ name: CI
|
||||||
on: push
|
on: push
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
|
|
||||||
mypy:
|
mypy:
|
||||||
name: 'MyPy'
|
name: 'MyPy'
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
@ -23,23 +24,59 @@ jobs:
|
||||||
run: mypy tractor/ --ignore-missing-imports
|
run: mypy tractor/ --ignore-missing-imports
|
||||||
|
|
||||||
testing:
|
testing:
|
||||||
|
|
||||||
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
|
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
|
||||||
timeout-minutes: 9
|
timeout-minutes: 9
|
||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
|
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
os: [ubuntu-latest, windows-latest]
|
os: [ubuntu-latest, windows-latest]
|
||||||
python: ['3.8', '3.9']
|
python: ['3.8', '3.9']
|
||||||
spawn_backend: ['trio', 'mp']
|
spawn_backend: ['trio', 'mp']
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
|
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
- name: Setup python
|
- name: Setup python
|
||||||
uses: actions/setup-python@v2
|
uses: actions/setup-python@v2
|
||||||
with:
|
with:
|
||||||
python-version: '${{ matrix.python }}'
|
python-version: '${{ matrix.python }}'
|
||||||
|
|
||||||
- name: Install dependencies
|
- name: Install dependencies
|
||||||
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
|
||||||
|
|
||||||
|
testing-msgspec:
|
||||||
|
# runs py3.9 jobs on all OS's but with optional `msgspec` dep installed
|
||||||
|
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }} - msgspec'
|
||||||
|
timeout-minutes: 10
|
||||||
|
runs-on: ${{ matrix.os }}
|
||||||
|
|
||||||
|
strategy:
|
||||||
|
fail-fast: false
|
||||||
|
matrix:
|
||||||
|
os: [ubuntu-latest, windows-latest]
|
||||||
|
python: ['3.9']
|
||||||
|
spawn_backend: ['trio', 'mp']
|
||||||
|
|
||||||
|
steps:
|
||||||
|
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Setup python
|
||||||
|
uses: actions/setup-python@v2
|
||||||
|
with:
|
||||||
|
python-version: '${{ matrix.python }}'
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: pip install -U .[msgspec] -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
||||||
|
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
|
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
|
||||||
|
|
|
@ -24,8 +24,9 @@ Features
|
||||||
- Built-in inter-process streaming APIs
|
- Built-in inter-process streaming APIs
|
||||||
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
|
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
|
||||||
- Support for a swappable, OS specific, process spawning layer
|
- Support for a swappable, OS specific, process spawning layer
|
||||||
- A modular transport stack, allowing for custom serialization,
|
- A modular transport stack, allowing for custom serialization (eg.
|
||||||
communications protocols, and environment specific IPC primitives
|
`msgspec`_), communications protocols, and environment specific IPC
|
||||||
|
primitives
|
||||||
- `structured concurrency`_ from the ground up
|
- `structured concurrency`_ from the ground up
|
||||||
|
|
||||||
|
|
||||||
|
@ -322,6 +323,12 @@ From PyPi::
|
||||||
pip install tractor
|
pip install tractor
|
||||||
|
|
||||||
|
|
||||||
|
To try out the (optionally) faster `msgspec`_ codec instead of the
|
||||||
|
default ``msgpack`` lib::
|
||||||
|
|
||||||
|
pip install tractor[msgspec]
|
||||||
|
|
||||||
|
|
||||||
From git::
|
From git::
|
||||||
|
|
||||||
pip install git+git://github.com/goodboy/tractor.git
|
pip install git+git://github.com/goodboy/tractor.git
|
||||||
|
@ -394,7 +401,8 @@ Help us push toward the future.
|
||||||
|
|
||||||
- (Soon to land) ``asyncio`` support allowing for "infected" actors where
|
- (Soon to land) ``asyncio`` support allowing for "infected" actors where
|
||||||
`trio` drives the `asyncio` scheduler via the astounding "`guest mode`_"
|
`trio` drives the `asyncio` scheduler via the astounding "`guest mode`_"
|
||||||
- Typed messaging protocols (ex. via ``msgspec``)
|
- Typed messaging protocols (ex. via ``msgspec``, see `#36
|
||||||
|
<https://github.com/goodboy/tractor/issues/36>`_)
|
||||||
- Erlang-style supervisors via composed context managers
|
- Erlang-style supervisors via composed context managers
|
||||||
|
|
||||||
|
|
||||||
|
@ -415,6 +423,7 @@ channel`_!
|
||||||
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
|
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
|
||||||
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
||||||
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
|
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
|
||||||
|
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
|
||||||
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
|
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
|
||||||
.. _trio gitter channel: https://gitter.im/python-trio/general
|
.. _trio gitter channel: https://gitter.im/python-trio/general
|
||||||
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
|
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
|
||||||
|
@ -423,10 +432,11 @@ channel`_!
|
||||||
.. _messages: https://en.wikipedia.org/wiki/Message_passing
|
.. _messages: https://en.wikipedia.org/wiki/Message_passing
|
||||||
.. _trio docs: https://trio.readthedocs.io/en/latest/
|
.. _trio docs: https://trio.readthedocs.io/en/latest/
|
||||||
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
|
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
|
||||||
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
|
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
||||||
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
|
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
|
||||||
.. _async generators: https://www.python.org/dev/peps/pep-0525/
|
.. _async generators: https://www.python.org/dev/peps/pep-0525/
|
||||||
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
|
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
|
||||||
|
.. _msgspec: https://jcristharif.com/msgspec/
|
||||||
|
|
||||||
|
|
||||||
.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square
|
.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
Add optional `msgspec <https://jcristharif.com/msgspec/>`_ support over
|
||||||
|
TCP streams as an alernative, faster MessagePack codec.
|
||||||
|
|
||||||
|
This get's us moving toward typed messaging/IPC protocols. Further,
|
||||||
|
``msgspec`` structs may be a valid tool to start for formalizing our "SC
|
||||||
|
dialog un-protocol" messages as described in `#36
|
||||||
|
<https://github.com/goodboy/tractor/issues/36>`_`.
|
||||||
|
|
||||||
|
|
10
setup.py
10
setup.py
|
@ -44,6 +44,10 @@ setup(
|
||||||
'async_generator',
|
'async_generator',
|
||||||
'trio_typing',
|
'trio_typing',
|
||||||
|
|
||||||
|
# tooling
|
||||||
|
'tricycle',
|
||||||
|
'trio_typing',
|
||||||
|
|
||||||
# tooling
|
# tooling
|
||||||
'colorlog',
|
'colorlog',
|
||||||
'wrapt',
|
'wrapt',
|
||||||
|
@ -53,6 +57,12 @@ setup(
|
||||||
'msgpack',
|
'msgpack',
|
||||||
|
|
||||||
],
|
],
|
||||||
|
extras_require={
|
||||||
|
|
||||||
|
# serialization
|
||||||
|
'msgspec': ["msgspec >= 0.3.2'; python_version >= '3.9'"],
|
||||||
|
|
||||||
|
},
|
||||||
tests_require=['pytest'],
|
tests_require=['pytest'],
|
||||||
python_requires=">=3.8",
|
python_requires=">=3.8",
|
||||||
keywords=[
|
keywords=[
|
||||||
|
|
|
@ -42,7 +42,7 @@ async def test_reg_then_unreg(arb_addr):
|
||||||
|
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
assert uid not in aportal.actor._registry
|
assert uid not in aportal.actor._registry
|
||||||
sockaddrs = actor._registry[uid]
|
sockaddrs = actor._registry.get(uid)
|
||||||
assert not sockaddrs
|
assert not sockaddrs
|
||||||
|
|
||||||
|
|
||||||
|
@ -136,7 +136,7 @@ async def spawn_and_check_registry(
|
||||||
if actor.is_arbiter:
|
if actor.is_arbiter:
|
||||||
|
|
||||||
async def get_reg():
|
async def get_reg():
|
||||||
return actor._registry
|
return await actor.get_registry()
|
||||||
|
|
||||||
extra = 1 # arbiter is local root actor
|
extra = 1 # arbiter is local root actor
|
||||||
else:
|
else:
|
||||||
|
@ -187,13 +187,12 @@ async def spawn_and_check_registry(
|
||||||
await cancel(use_signal)
|
await cancel(use_signal)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
with trio.CancelScope(shield=True):
|
await trio.sleep(0.5)
|
||||||
await trio.sleep(0.5)
|
|
||||||
|
|
||||||
# all subactors should have de-registered
|
# all subactors should have de-registered
|
||||||
registry = await get_reg()
|
registry = await get_reg()
|
||||||
assert len(registry) == extra
|
assert len(registry) == extra
|
||||||
assert actor.uid in registry
|
assert actor.uid in registry
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('use_signal', [False, True])
|
@pytest.mark.parametrize('use_signal', [False, True])
|
||||||
|
@ -277,7 +276,9 @@ async def close_chans_before_nursery(
|
||||||
|
|
||||||
# TODO: compact this back as was in last commit once
|
# TODO: compact this back as was in last commit once
|
||||||
# 3.9+, see https://github.com/goodboy/tractor/issues/207
|
# 3.9+, see https://github.com/goodboy/tractor/issues/207
|
||||||
async with portal1.open_stream_from(stream_forever) as agen1:
|
async with portal1.open_stream_from(
|
||||||
|
stream_forever
|
||||||
|
) as agen1:
|
||||||
async with portal2.open_stream_from(
|
async with portal2.open_stream_from(
|
||||||
stream_forever
|
stream_forever
|
||||||
) as agen2:
|
) as agen2:
|
||||||
|
@ -293,8 +294,9 @@ async def close_chans_before_nursery(
|
||||||
# reliably triggered by an external SIGINT.
|
# reliably triggered by an external SIGINT.
|
||||||
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
|
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
# XXX: THIS IS THE KEY THING that happens
|
# XXX: THIS IS THE KEY THING that
|
||||||
# **before** exiting the actor nursery block
|
# happens **before** exiting the
|
||||||
|
# actor nursery block
|
||||||
|
|
||||||
# also kill off channels cuz why not
|
# also kill off channels cuz why not
|
||||||
await agen1.aclose()
|
await agen1.aclose()
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
async def sleep_and_err():
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
assert 0
|
||||||
|
|
||||||
|
|
||||||
|
async def asyncio_actor():
|
||||||
|
assert tractor.current_actor().is_infected_aio()
|
||||||
|
|
||||||
|
await tractor.to_asyncio.run_task(sleep_and_err)
|
||||||
|
|
||||||
|
|
||||||
|
def test_infected_simple_error(arb_addr):
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
await n.run_in_actor(asyncio_actor, infected_asyncio=True)
|
||||||
|
|
||||||
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
|
tractor.run(main, arbiter_addr=arb_addr)
|
|
@ -1,6 +1,7 @@
|
||||||
"""
|
"""
|
||||||
Spawning basics
|
Spawning basics
|
||||||
"""
|
"""
|
||||||
|
from typing import Dict, Tuple
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
|
@ -11,7 +12,11 @@ from conftest import tractor_test
|
||||||
data_to_pass_down = {'doggy': 10, 'kitty': 4}
|
data_to_pass_down = {'doggy': 10, 'kitty': 4}
|
||||||
|
|
||||||
|
|
||||||
async def spawn(is_arbiter, data, arb_addr):
|
async def spawn(
|
||||||
|
is_arbiter: bool,
|
||||||
|
data: Dict,
|
||||||
|
arb_addr: Tuple[str, int],
|
||||||
|
):
|
||||||
namespaces = [__name__]
|
namespaces = [__name__]
|
||||||
|
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
|
|
@ -280,6 +280,9 @@ class Actor:
|
||||||
_parent_main_data: Dict[str, str]
|
_parent_main_data: Dict[str, str]
|
||||||
_parent_chan_cs: Optional[trio.CancelScope] = None
|
_parent_chan_cs: Optional[trio.CancelScope] = None
|
||||||
|
|
||||||
|
# if started on ``asycio`` running ``trio`` in guest mode
|
||||||
|
_infected_aio: bool = False
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -317,7 +320,8 @@ class Actor:
|
||||||
# TODO: consider making this a dynamically defined
|
# TODO: consider making this a dynamically defined
|
||||||
# @dataclass once we get py3.7
|
# @dataclass once we get py3.7
|
||||||
self.loglevel = loglevel
|
self.loglevel = loglevel
|
||||||
self._arb_addr = arbiter_addr
|
|
||||||
|
self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None
|
||||||
|
|
||||||
# marked by the process spawning backend at startup
|
# marked by the process spawning backend at startup
|
||||||
# will be None for the parent most process started manually
|
# will be None for the parent most process started manually
|
||||||
|
@ -429,7 +433,10 @@ class Actor:
|
||||||
uid = await self._do_handshake(chan)
|
uid = await self._do_handshake(chan)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
|
# we need this for ``msgspec`` for some reason?
|
||||||
|
# for now, it's been put in the stream backend.
|
||||||
# trio.BrokenResourceError,
|
# trio.BrokenResourceError,
|
||||||
|
|
||||||
# trio.ClosedResourceError,
|
# trio.ClosedResourceError,
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
):
|
):
|
||||||
|
@ -615,6 +622,7 @@ class Actor:
|
||||||
# ``scope = Nursery.start()``
|
# ``scope = Nursery.start()``
|
||||||
task_status.started(loop_cs)
|
task_status.started(loop_cs)
|
||||||
async for msg in chan:
|
async for msg in chan:
|
||||||
|
|
||||||
if msg is None: # loop terminate sentinel
|
if msg is None: # loop terminate sentinel
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
|
@ -775,6 +783,7 @@ class Actor:
|
||||||
|
|
||||||
if self._spawn_method == "trio":
|
if self._spawn_method == "trio":
|
||||||
# Receive runtime state from our parent
|
# Receive runtime state from our parent
|
||||||
|
parent_data: dict[str, Any]
|
||||||
parent_data = await chan.recv()
|
parent_data = await chan.recv()
|
||||||
log.debug(
|
log.debug(
|
||||||
"Received state from parent:\n"
|
"Received state from parent:\n"
|
||||||
|
@ -790,7 +799,16 @@ class Actor:
|
||||||
_state._runtime_vars.update(rvs)
|
_state._runtime_vars.update(rvs)
|
||||||
|
|
||||||
for attr, value in parent_data.items():
|
for attr, value in parent_data.items():
|
||||||
setattr(self, attr, value)
|
|
||||||
|
if attr == '_arb_addr':
|
||||||
|
# XXX: ``msgspec`` doesn't support serializing tuples
|
||||||
|
# so just cash manually here since it's what our
|
||||||
|
# internals expect.
|
||||||
|
value = tuple(value) if value else None
|
||||||
|
self._arb_addr = value
|
||||||
|
|
||||||
|
else:
|
||||||
|
setattr(self, attr, value)
|
||||||
|
|
||||||
return chan, accept_addr
|
return chan, accept_addr
|
||||||
|
|
||||||
|
@ -1162,6 +1180,7 @@ class Actor:
|
||||||
async def _do_handshake(
|
async def _do_handshake(
|
||||||
self,
|
self,
|
||||||
chan: Channel
|
chan: Channel
|
||||||
|
|
||||||
) -> Tuple[str, str]:
|
) -> Tuple[str, str]:
|
||||||
"""Exchange (name, UUIDs) identifiers as the first communication step.
|
"""Exchange (name, UUIDs) identifiers as the first communication step.
|
||||||
|
|
||||||
|
@ -1169,15 +1188,19 @@ class Actor:
|
||||||
parlance.
|
parlance.
|
||||||
"""
|
"""
|
||||||
await chan.send(self.uid)
|
await chan.send(self.uid)
|
||||||
uid: Tuple[str, str] = await chan.recv()
|
value = await chan.recv()
|
||||||
|
uid: Tuple[str, str] = (str(value[0]), str(value[1]))
|
||||||
|
|
||||||
if not isinstance(uid, tuple):
|
if not isinstance(uid, tuple):
|
||||||
raise ValueError(f"{uid} is not a valid uid?!")
|
raise ValueError(f"{uid} is not a valid uid?!")
|
||||||
|
|
||||||
chan.uid = uid
|
chan.uid = str(uid[0]), str(uid[1])
|
||||||
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
|
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||||
return uid
|
return uid
|
||||||
|
|
||||||
|
def is_infected_aio(self) -> bool:
|
||||||
|
return self._infected_aio
|
||||||
|
|
||||||
|
|
||||||
class Arbiter(Actor):
|
class Arbiter(Actor):
|
||||||
"""A special actor who knows all the other actors and always has
|
"""A special actor who knows all the other actors and always has
|
||||||
|
@ -1191,8 +1214,13 @@ class Arbiter(Actor):
|
||||||
is_arbiter = True
|
is_arbiter = True
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self._registry = defaultdict(list)
|
|
||||||
|
self._registry: Dict[
|
||||||
|
Tuple[str, str],
|
||||||
|
Tuple[str, int],
|
||||||
|
] = {}
|
||||||
self._waiters = {}
|
self._waiters = {}
|
||||||
|
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
async def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
|
async def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
|
||||||
|
@ -1204,9 +1232,11 @@ class Arbiter(Actor):
|
||||||
|
|
||||||
async def get_registry(
|
async def get_registry(
|
||||||
self
|
self
|
||||||
) -> Dict[str, Tuple[str, str]]:
|
) -> Dict[Tuple[str, str], Tuple[str, int]]:
|
||||||
"""Return current name registry.
|
'''Return current name registry.
|
||||||
"""
|
|
||||||
|
This method is async to allow for cross-actor invocation.
|
||||||
|
'''
|
||||||
# NOTE: requires ``strict_map_key=False`` to the msgpack
|
# NOTE: requires ``strict_map_key=False`` to the msgpack
|
||||||
# unpacker since we have tuples as keys (not this makes the
|
# unpacker since we have tuples as keys (not this makes the
|
||||||
# arbiter suscetible to hashdos):
|
# arbiter suscetible to hashdos):
|
||||||
|
@ -1214,13 +1244,14 @@ class Arbiter(Actor):
|
||||||
return self._registry
|
return self._registry
|
||||||
|
|
||||||
async def wait_for_actor(
|
async def wait_for_actor(
|
||||||
self, name: str
|
self,
|
||||||
|
name: str,
|
||||||
) -> List[Tuple[str, int]]:
|
) -> List[Tuple[str, int]]:
|
||||||
"""Wait for a particular actor to register.
|
'''Wait for a particular actor to register.
|
||||||
|
|
||||||
This is a blocking call if no actor by the provided name is currently
|
This is a blocking call if no actor by the provided name is currently
|
||||||
registered.
|
registered.
|
||||||
"""
|
'''
|
||||||
sockaddrs = []
|
sockaddrs = []
|
||||||
|
|
||||||
for (aname, _), sockaddr in self._registry.items():
|
for (aname, _), sockaddr in self._registry.items():
|
||||||
|
@ -1237,10 +1268,13 @@ class Arbiter(Actor):
|
||||||
return sockaddrs
|
return sockaddrs
|
||||||
|
|
||||||
async def register_actor(
|
async def register_actor(
|
||||||
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
|
self,
|
||||||
|
uid: Tuple[str, str],
|
||||||
|
sockaddr: Tuple[str, int]
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
name, uuid = uid
|
uid = name, uuid = (str(uid[0]), str(uid[1]))
|
||||||
self._registry[uid] = sockaddr
|
self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
|
||||||
|
|
||||||
# pop and signal all waiter events
|
# pop and signal all waiter events
|
||||||
events = self._waiters.pop(name, ())
|
events = self._waiters.pop(name, ())
|
||||||
|
@ -1249,5 +1283,9 @@ class Arbiter(Actor):
|
||||||
if isinstance(event, trio.Event):
|
if isinstance(event, trio.Event):
|
||||||
event.set()
|
event.set()
|
||||||
|
|
||||||
async def unregister_actor(self, uid: Tuple[str, str]) -> None:
|
async def unregister_actor(
|
||||||
|
self,
|
||||||
|
uid: Tuple[str, str]
|
||||||
|
) -> None:
|
||||||
|
uid = (str(uid[0]), str(uid[1]))
|
||||||
self._registry.pop(uid)
|
self._registry.pop(uid)
|
||||||
|
|
|
@ -19,12 +19,15 @@ def parse_ipaddr(arg):
|
||||||
return (str(host), int(port))
|
return (str(host), int(port))
|
||||||
|
|
||||||
|
|
||||||
|
from ._entry import _trio_main
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument("--uid", type=parse_uid)
|
parser.add_argument("--uid", type=parse_uid)
|
||||||
parser.add_argument("--loglevel", type=str)
|
parser.add_argument("--loglevel", type=str)
|
||||||
parser.add_argument("--parent_addr", type=parse_ipaddr)
|
parser.add_argument("--parent_addr", type=parse_ipaddr)
|
||||||
|
parser.add_argument("--asyncio", action='store_true')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
subactor = Actor(
|
subactor = Actor(
|
||||||
|
@ -36,5 +39,6 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
_trio_main(
|
_trio_main(
|
||||||
subactor,
|
subactor,
|
||||||
parent_addr=args.parent_addr
|
parent_addr=args.parent_addr,
|
||||||
|
infect_asyncio=args.asyncio,
|
||||||
)
|
)
|
|
@ -254,6 +254,7 @@ async def _hijack_stdin_for_child(
|
||||||
# assert await stream.receive() == 'pdb_unlock'
|
# assert await stream.receive() == 'pdb_unlock'
|
||||||
|
|
||||||
except (
|
except (
|
||||||
|
trio.MultiError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.Cancelled, # by local cancellation
|
trio.Cancelled, # by local cancellation
|
||||||
trio.ClosedResourceError, # by self._rx_chan
|
trio.ClosedResourceError, # by self._rx_chan
|
||||||
|
@ -343,6 +344,7 @@ async def _breakpoint(
|
||||||
|
|
||||||
except tractor.ContextCancelled:
|
except tractor.ContextCancelled:
|
||||||
log.warning('Root actor cancelled debug lock')
|
log.warning('Root actor cancelled debug lock')
|
||||||
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.debug(f"Exiting debugger for actor {actor}")
|
log.debug(f"Exiting debugger for actor {actor}")
|
||||||
|
@ -407,10 +409,14 @@ async def _breakpoint(
|
||||||
'Root actor attempting to shield-acquire active tty lock'
|
'Root actor attempting to shield-acquire active tty lock'
|
||||||
f' owned by {_global_actor_in_debug}')
|
f' owned by {_global_actor_in_debug}')
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
stats = _debug_lock.statistics()
|
||||||
# must shield here to avoid hitting a ``Cancelled`` and
|
if stats.owner:
|
||||||
# a child getting stuck bc we clobbered the tty
|
breakpoint()
|
||||||
await _debug_lock.acquire()
|
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# must shield here to avoid hitting a ``Cancelled`` and
|
||||||
|
# a child getting stuck bc we clobbered the tty
|
||||||
|
await _debug_lock.acquire()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# may be cancelled
|
# may be cancelled
|
||||||
|
|
|
@ -9,6 +9,7 @@ import trio # type: ignore
|
||||||
|
|
||||||
from .log import get_console_log, get_logger
|
from .log import get_console_log, get_logger
|
||||||
from . import _state
|
from . import _state
|
||||||
|
from .to_asyncio import run_as_asyncio_guest
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -20,6 +21,7 @@ def _mp_main(
|
||||||
forkserver_info: Tuple[Any, Any, Any, Any, Any],
|
forkserver_info: Tuple[Any, Any, Any, Any, Any],
|
||||||
start_method: str,
|
start_method: str,
|
||||||
parent_addr: Tuple[str, int] = None,
|
parent_addr: Tuple[str, int] = None,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""The routine called *after fork* which invokes a fresh ``trio.run``
|
"""The routine called *after fork* which invokes a fresh ``trio.run``
|
||||||
"""
|
"""
|
||||||
|
@ -45,7 +47,11 @@ def _mp_main(
|
||||||
parent_addr=parent_addr
|
parent_addr=parent_addr
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
trio.run(trio_main)
|
if infect_asyncio:
|
||||||
|
actor._infected_aio = True
|
||||||
|
run_as_asyncio_guest(trio_main)
|
||||||
|
else:
|
||||||
|
trio.run(trio_main)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass # handle it the same way trio does?
|
pass # handle it the same way trio does?
|
||||||
|
|
||||||
|
@ -57,6 +63,7 @@ def _trio_main(
|
||||||
actor: 'Actor', # type: ignore
|
actor: 'Actor', # type: ignore
|
||||||
*,
|
*,
|
||||||
parent_addr: Tuple[str, int] = None,
|
parent_addr: Tuple[str, int] = None,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Entry point for a `trio_run_in_process` subactor.
|
"""Entry point for a `trio_run_in_process` subactor.
|
||||||
"""
|
"""
|
||||||
|
@ -66,6 +73,8 @@ def _trio_main(
|
||||||
|
|
||||||
log.info(f"Started new trio process for {actor.uid}")
|
log.info(f"Started new trio process for {actor.uid}")
|
||||||
|
|
||||||
|
log.info(f"Started new trio process for {actor.uid}")
|
||||||
|
|
||||||
if actor.loglevel is not None:
|
if actor.loglevel is not None:
|
||||||
log.info(
|
log.info(
|
||||||
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
|
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
|
||||||
|
@ -83,7 +92,11 @@ def _trio_main(
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
trio.run(trio_main)
|
if infect_asyncio:
|
||||||
|
actor._infected_aio = True
|
||||||
|
run_as_asyncio_guest(trio_main)
|
||||||
|
else:
|
||||||
|
trio.run(trio_main)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
log.warning(f"Actor {actor.uid} received KBI")
|
log.warning(f"Actor {actor.uid} received KBI")
|
||||||
|
|
||||||
|
|
141
tractor/_ipc.py
141
tractor/_ipc.py
|
@ -3,10 +3,11 @@ Inter-process comms abstractions
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import platform
|
import platform
|
||||||
|
import struct
|
||||||
import typing
|
import typing
|
||||||
from typing import Any, Tuple, Optional
|
from typing import Any, Tuple, Optional, Type
|
||||||
from functools import partial
|
|
||||||
|
|
||||||
|
from tricycle import BufferedReceiveStream
|
||||||
import msgpack
|
import msgpack
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
@ -17,14 +18,7 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
_is_windows = platform.system() == 'Windows'
|
_is_windows = platform.system() == 'Windows'
|
||||||
|
log = get_logger(__name__)
|
||||||
# :eyeroll:
|
|
||||||
try:
|
|
||||||
import msgpack_numpy
|
|
||||||
Unpacker = msgpack_numpy.Unpacker
|
|
||||||
except ImportError:
|
|
||||||
# just plain ``msgpack`` requires tweaking key settings
|
|
||||||
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
|
|
||||||
|
|
||||||
|
|
||||||
class MsgpackTCPStream:
|
class MsgpackTCPStream:
|
||||||
|
@ -40,26 +34,28 @@ class MsgpackTCPStream:
|
||||||
|
|
||||||
self.stream = stream
|
self.stream = stream
|
||||||
assert self.stream.socket
|
assert self.stream.socket
|
||||||
|
|
||||||
# should both be IP sockets
|
# should both be IP sockets
|
||||||
lsockname = stream.socket.getsockname()
|
lsockname = stream.socket.getsockname()
|
||||||
assert isinstance(lsockname, tuple)
|
assert isinstance(lsockname, tuple)
|
||||||
self._laddr = lsockname[:2]
|
self._laddr = lsockname[:2]
|
||||||
|
|
||||||
rsockname = stream.socket.getpeername()
|
rsockname = stream.socket.getpeername()
|
||||||
assert isinstance(rsockname, tuple)
|
assert isinstance(rsockname, tuple)
|
||||||
self._raddr = rsockname[:2]
|
self._raddr = rsockname[:2]
|
||||||
|
|
||||||
# start and seed first entry to read loop
|
# start first entry to read loop
|
||||||
self._agen = self._iter_packets()
|
self._agen = self._iter_packets()
|
||||||
# self._agen.asend(None) is None
|
|
||||||
|
|
||||||
self._send_lock = trio.StrictFIFOLock()
|
self._send_lock = trio.StrictFIFOLock()
|
||||||
|
|
||||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||||
"""Yield packets from the underlying stream.
|
"""Yield packets from the underlying stream.
|
||||||
"""
|
"""
|
||||||
unpacker = Unpacker(
|
unpacker = msgpack.Unpacker(
|
||||||
raw=False,
|
raw=False,
|
||||||
use_list=False,
|
use_list=False,
|
||||||
|
strict_map_key=False
|
||||||
)
|
)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
@ -111,7 +107,8 @@ class MsgpackTCPStream:
|
||||||
async def send(self, data: Any) -> None:
|
async def send(self, data: Any) -> None:
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
return await self.stream.send_all(
|
return await self.stream.send_all(
|
||||||
msgpack.dumps(data, use_bin_type=True))
|
msgpack.dumps(data, use_bin_type=True)
|
||||||
|
)
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
return await self._agen.asend(None)
|
return await self._agen.asend(None)
|
||||||
|
@ -123,12 +120,104 @@ class MsgpackTCPStream:
|
||||||
return self.stream.socket.fileno() != -1
|
return self.stream.socket.fileno() != -1
|
||||||
|
|
||||||
|
|
||||||
|
class MsgspecTCPStream(MsgpackTCPStream):
|
||||||
|
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||||
|
using ``msgspec``.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
stream: trio.SocketStream,
|
||||||
|
prefix_size: int = 4,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
import msgspec
|
||||||
|
|
||||||
|
super().__init__(stream)
|
||||||
|
self.recv_stream = BufferedReceiveStream(transport_stream=stream)
|
||||||
|
self.prefix_size = prefix_size
|
||||||
|
|
||||||
|
# TODO: struct aware messaging coders
|
||||||
|
self.encode = msgspec.Encoder().encode
|
||||||
|
self.decode = msgspec.Decoder().decode # dict[str, Any])
|
||||||
|
|
||||||
|
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||||
|
'''Yield packets from the underlying stream.
|
||||||
|
|
||||||
|
'''
|
||||||
|
import msgspec # noqa
|
||||||
|
last_decode_failed: bool = False
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
header = await self.recv_stream.receive_exactly(4)
|
||||||
|
|
||||||
|
except (
|
||||||
|
ValueError,
|
||||||
|
|
||||||
|
# not sure entirely why we need this but without it we
|
||||||
|
# seem to be getting racy failures here on
|
||||||
|
# arbiter/registry name subs..
|
||||||
|
trio.BrokenResourceError,
|
||||||
|
):
|
||||||
|
raise TransportClosed(
|
||||||
|
f'transport {self} was already closed prior ro read'
|
||||||
|
)
|
||||||
|
|
||||||
|
if header == b'':
|
||||||
|
raise TransportClosed(
|
||||||
|
f'transport {self} was already closed prior ro read'
|
||||||
|
)
|
||||||
|
|
||||||
|
size, = struct.unpack("<I", header)
|
||||||
|
|
||||||
|
log.transport(f'received header {size}') # type: ignore
|
||||||
|
|
||||||
|
msg_bytes = await self.recv_stream.receive_exactly(size)
|
||||||
|
|
||||||
|
log.transport(f"received {msg_bytes}") # type: ignore
|
||||||
|
try:
|
||||||
|
assert not last_decode_failed
|
||||||
|
yield self.decode(msg_bytes)
|
||||||
|
except (
|
||||||
|
msgspec.DecodingError,
|
||||||
|
UnicodeDecodeError,
|
||||||
|
):
|
||||||
|
# ignore decoding errors for now and assume they have to
|
||||||
|
# do with a channel drop - hope that receiving from the
|
||||||
|
# channel will raise an expected error and bubble up.
|
||||||
|
log.error('`msgspec` failed to decode!?')
|
||||||
|
last_decode_failed = True
|
||||||
|
|
||||||
|
async def send(self, data: Any) -> None:
|
||||||
|
async with self._send_lock:
|
||||||
|
|
||||||
|
bytes_data: bytes = self.encode(data)
|
||||||
|
|
||||||
|
# supposedly the fastest says,
|
||||||
|
# https://stackoverflow.com/a/54027962
|
||||||
|
size: bytes = struct.pack("<I", len(bytes_data))
|
||||||
|
|
||||||
|
return await self.stream.send_all(size + bytes_data)
|
||||||
|
|
||||||
|
|
||||||
|
def get_serializer_stream_type(
|
||||||
|
name: str,
|
||||||
|
) -> Type:
|
||||||
|
return {
|
||||||
|
'msgpack': MsgpackTCPStream,
|
||||||
|
'msgspec': MsgspecTCPStream,
|
||||||
|
}[name]
|
||||||
|
|
||||||
|
|
||||||
class Channel:
|
class Channel:
|
||||||
"""An inter-process channel for communication between (remote) actors.
|
'''An inter-process channel for communication between (remote) actors.
|
||||||
|
|
||||||
Currently the only supported transport is a ``trio.SocketStream``.
|
Currently the only supported transport is a ``trio.SocketStream``.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
destaddr: Optional[Tuple[str, int]] = None,
|
destaddr: Optional[Tuple[str, int]] = None,
|
||||||
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
||||||
|
@ -136,17 +225,32 @@ class Channel:
|
||||||
stream: trio.SocketStream = None, # expected to be active
|
stream: trio.SocketStream = None, # expected to be active
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
self._recon_seq = on_reconnect
|
self._recon_seq = on_reconnect
|
||||||
self._autorecon = auto_reconnect
|
self._autorecon = auto_reconnect
|
||||||
self.msgstream: Optional[MsgpackTCPStream] = MsgpackTCPStream(
|
|
||||||
|
# TODO: maybe expose this through the nursery api?
|
||||||
|
try:
|
||||||
|
# if installed load the msgspec transport since it's faster
|
||||||
|
import msgspec # noqa
|
||||||
|
serializer = 'msgspec'
|
||||||
|
except ImportError:
|
||||||
|
serializer = 'msgpack'
|
||||||
|
|
||||||
|
self.stream_serializer_type = get_serializer_stream_type(serializer)
|
||||||
|
self.msgstream = self.stream_serializer_type(
|
||||||
stream) if stream else None
|
stream) if stream else None
|
||||||
|
|
||||||
if self.msgstream and destaddr:
|
if self.msgstream and destaddr:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"A stream was provided with local addr {self.laddr}"
|
f"A stream was provided with local addr {self.laddr}"
|
||||||
)
|
)
|
||||||
|
|
||||||
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
|
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
|
||||||
|
|
||||||
# set after handshake - always uid of far end
|
# set after handshake - always uid of far end
|
||||||
self.uid: Optional[Tuple[str, str]] = None
|
self.uid: Optional[Tuple[str, str]] = None
|
||||||
|
|
||||||
# set if far end actor errors internally
|
# set if far end actor errors internally
|
||||||
self._exc: Optional[Exception] = None
|
self._exc: Optional[Exception] = None
|
||||||
self._agen = self._aiter_recv()
|
self._agen = self._aiter_recv()
|
||||||
|
@ -169,7 +273,6 @@ class Channel:
|
||||||
return self.msgstream.raddr if self.msgstream else None
|
return self.msgstream.raddr if self.msgstream else None
|
||||||
|
|
||||||
async def connect(
|
async def connect(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
destaddr: Tuple[Any, ...] = None,
|
destaddr: Tuple[Any, ...] = None,
|
||||||
**kwargs
|
**kwargs
|
||||||
|
@ -186,7 +289,7 @@ class Channel:
|
||||||
*destaddr,
|
*destaddr,
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
self.msgstream = MsgpackTCPStream(stream)
|
self.msgstream = self.stream_serializer_type(stream)
|
||||||
|
|
||||||
log.transport(
|
log.transport(
|
||||||
f'Opened channel to peer {self.laddr} -> {self.raddr}'
|
f'Opened channel to peer {self.laddr} -> {self.raddr}'
|
||||||
|
|
|
@ -21,8 +21,8 @@ from ._exceptions import is_multi_cancelled
|
||||||
|
|
||||||
|
|
||||||
# set at startup and after forks
|
# set at startup and after forks
|
||||||
_default_arbiter_host = '127.0.0.1'
|
_default_arbiter_host: str = '127.0.0.1'
|
||||||
_default_arbiter_port = 1616
|
_default_arbiter_port: int = 1616
|
||||||
|
|
||||||
|
|
||||||
logger = log.get_logger('tractor')
|
logger = log.get_logger('tractor')
|
||||||
|
@ -32,7 +32,7 @@ logger = log.get_logger('tractor')
|
||||||
async def open_root_actor(
|
async def open_root_actor(
|
||||||
|
|
||||||
# defaults are above
|
# defaults are above
|
||||||
arbiter_addr: Tuple[str, int] = (
|
arbiter_addr: Optional[Tuple[str, int]] = (
|
||||||
_default_arbiter_host,
|
_default_arbiter_host,
|
||||||
_default_arbiter_port,
|
_default_arbiter_port,
|
||||||
),
|
),
|
||||||
|
@ -97,7 +97,7 @@ async def open_root_actor(
|
||||||
|
|
||||||
arbiter_addr = (host, port) = arbiter_addr or (
|
arbiter_addr = (host, port) = arbiter_addr or (
|
||||||
_default_arbiter_host,
|
_default_arbiter_host,
|
||||||
_default_arbiter_port
|
_default_arbiter_port,
|
||||||
)
|
)
|
||||||
|
|
||||||
loglevel = loglevel or log.get_loglevel()
|
loglevel = loglevel or log.get_loglevel()
|
||||||
|
@ -238,7 +238,7 @@ def run(
|
||||||
|
|
||||||
|
|
||||||
def run_daemon(
|
def run_daemon(
|
||||||
rpc_module_paths: List[str],
|
enable_modules: List[str],
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Spawn daemon actor which will respond to RPC.
|
"""Spawn daemon actor which will respond to RPC.
|
||||||
|
@ -247,9 +247,9 @@ def run_daemon(
|
||||||
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
|
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
|
||||||
is meant to run forever responding to RPC requests.
|
is meant to run forever responding to RPC requests.
|
||||||
"""
|
"""
|
||||||
kwargs['rpc_module_paths'] = list(rpc_module_paths)
|
kwargs['enable_modules'] = list(enable_modules)
|
||||||
|
|
||||||
for path in rpc_module_paths:
|
for path in enable_modules:
|
||||||
importlib.import_module(path)
|
importlib.import_module(path)
|
||||||
|
|
||||||
return run(partial(trio.sleep, float('inf')), **kwargs)
|
return run(partial(trio.sleep, float('inf')), **kwargs)
|
||||||
|
|
|
@ -179,6 +179,7 @@ async def do_hard_kill(
|
||||||
async def spawn_subactor(
|
async def spawn_subactor(
|
||||||
subactor: 'Actor',
|
subactor: 'Actor',
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
|
infect_asyncio: bool,
|
||||||
):
|
):
|
||||||
spawn_cmd = [
|
spawn_cmd = [
|
||||||
sys.executable,
|
sys.executable,
|
||||||
|
@ -203,6 +204,10 @@ async def spawn_subactor(
|
||||||
subactor.loglevel
|
subactor.loglevel
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Tell child to run in guest mode on top of ``asyncio`` loop
|
||||||
|
if infect_asyncio:
|
||||||
|
spawn_cmd.append("--asyncio")
|
||||||
|
|
||||||
proc = await trio.open_process(spawn_cmd)
|
proc = await trio.open_process(spawn_cmd)
|
||||||
try:
|
try:
|
||||||
yield proc
|
yield proc
|
||||||
|
@ -227,6 +232,7 @@ async def new_proc(
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Create a new ``multiprocessing.Process`` using the
|
"""Create a new ``multiprocessing.Process`` using the
|
||||||
|
@ -242,6 +248,7 @@ async def new_proc(
|
||||||
async with spawn_subactor(
|
async with spawn_subactor(
|
||||||
subactor,
|
subactor,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
|
infect_asyncio=infect_asyncio
|
||||||
) as proc:
|
) as proc:
|
||||||
log.runtime(f"Started {proc}")
|
log.runtime(f"Started {proc}")
|
||||||
|
|
||||||
|
@ -324,6 +331,7 @@ async def new_proc(
|
||||||
bind_addr=bind_addr,
|
bind_addr=bind_addr,
|
||||||
parent_addr=parent_addr,
|
parent_addr=parent_addr,
|
||||||
_runtime_vars=_runtime_vars,
|
_runtime_vars=_runtime_vars,
|
||||||
|
infect_asyncio=infect_asyncio,
|
||||||
task_status=task_status,
|
task_status=task_status,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -339,6 +347,7 @@ async def mp_new_proc(
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -384,6 +393,7 @@ async def mp_new_proc(
|
||||||
fs_info,
|
fs_info,
|
||||||
start_method,
|
start_method,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
|
infect_asyncio,
|
||||||
),
|
),
|
||||||
# daemon=True,
|
# daemon=True,
|
||||||
name=name,
|
name=name,
|
||||||
|
|
|
@ -61,6 +61,8 @@ class ActorNursery:
|
||||||
enable_modules: List[str] = None,
|
enable_modules: List[str] = None,
|
||||||
loglevel: str = None, # set log level per subactor
|
loglevel: str = None, # set log level per subactor
|
||||||
nursery: trio.Nursery = None,
|
nursery: trio.Nursery = None,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
|
debug_mode: Optional[bool] = None,
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
||||||
|
|
||||||
|
@ -68,6 +70,10 @@ class ActorNursery:
|
||||||
_rtv = _state._runtime_vars.copy()
|
_rtv = _state._runtime_vars.copy()
|
||||||
_rtv['_is_root'] = False
|
_rtv['_is_root'] = False
|
||||||
|
|
||||||
|
# allow setting debug policy per actor
|
||||||
|
if debug_mode is not None:
|
||||||
|
_rtv['_debug_mode'] = debug_mode
|
||||||
|
|
||||||
enable_modules = enable_modules or []
|
enable_modules = enable_modules or []
|
||||||
|
|
||||||
if rpc_module_paths:
|
if rpc_module_paths:
|
||||||
|
@ -104,6 +110,7 @@ class ActorNursery:
|
||||||
bind_addr,
|
bind_addr,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
_rtv, # run time vars
|
_rtv, # run time vars
|
||||||
|
infect_asyncio=infect_asyncio,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -116,6 +123,7 @@ class ActorNursery:
|
||||||
rpc_module_paths: Optional[List[str]] = None,
|
rpc_module_paths: Optional[List[str]] = None,
|
||||||
enable_modules: List[str] = None,
|
enable_modules: List[str] = None,
|
||||||
loglevel: str = None, # set log level per subactor
|
loglevel: str = None, # set log level per subactor
|
||||||
|
infect_asyncio: bool = False,
|
||||||
**kwargs, # explicit args to ``fn``
|
**kwargs, # explicit args to ``fn``
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
"""Spawn a new actor, run a lone task, then terminate the actor and
|
"""Spawn a new actor, run a lone task, then terminate the actor and
|
||||||
|
@ -140,6 +148,7 @@ class ActorNursery:
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
# use the run_in_actor nursery
|
# use the run_in_actor nursery
|
||||||
nursery=self._ria_nursery,
|
nursery=self._ria_nursery,
|
||||||
|
infect_asyncio=infect_asyncio,
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: don't allow stream funcs
|
# XXX: don't allow stream funcs
|
||||||
|
|
|
@ -121,6 +121,7 @@ def pub(
|
||||||
wrapped: typing.Callable = None,
|
wrapped: typing.Callable = None,
|
||||||
*,
|
*,
|
||||||
tasks: Set[str] = set(),
|
tasks: Set[str] = set(),
|
||||||
|
send_on_connect: Any = None,
|
||||||
):
|
):
|
||||||
"""Publisher async generator decorator.
|
"""Publisher async generator decorator.
|
||||||
|
|
||||||
|
@ -206,7 +207,7 @@ def pub(
|
||||||
|
|
||||||
# handle the decorator not called with () case
|
# handle the decorator not called with () case
|
||||||
if wrapped is None:
|
if wrapped is None:
|
||||||
return partial(pub, tasks=tasks)
|
return partial(pub, tasks=tasks, send_on_connect=send_on_connect)
|
||||||
|
|
||||||
task2lock: Dict[str, trio.StrictFIFOLock] = {}
|
task2lock: Dict[str, trio.StrictFIFOLock] = {}
|
||||||
|
|
||||||
|
@ -249,6 +250,11 @@ def pub(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
modify_subs(topics2ctxs, topics, ctx)
|
modify_subs(topics2ctxs, topics, ctx)
|
||||||
|
|
||||||
|
# if specified send the startup message back to consumer
|
||||||
|
if send_on_connect is not None:
|
||||||
|
await ctx.send_yield(send_on_connect)
|
||||||
|
|
||||||
# block and let existing feed task deliver
|
# block and let existing feed task deliver
|
||||||
# stream data until it is cancelled in which case
|
# stream data until it is cancelled in which case
|
||||||
# the next waiting task will take over and spawn it again
|
# the next waiting task will take over and spawn it again
|
||||||
|
|
|
@ -0,0 +1,241 @@
|
||||||
|
'''
|
||||||
|
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
|
||||||
|
|
||||||
|
'''
|
||||||
|
import asyncio
|
||||||
|
import inspect
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Callable,
|
||||||
|
AsyncIterator,
|
||||||
|
Awaitable,
|
||||||
|
)
|
||||||
|
|
||||||
|
import trio
|
||||||
|
|
||||||
|
from .log import get_logger
|
||||||
|
from ._state import current_actor
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ['run_task', 'run_as_asyncio_guest']
|
||||||
|
|
||||||
|
|
||||||
|
async def run_coro(
|
||||||
|
to_trio: trio.MemorySendChannel,
|
||||||
|
coro: Awaitable,
|
||||||
|
) -> None:
|
||||||
|
"""Await ``coro`` and relay result back to ``trio``.
|
||||||
|
"""
|
||||||
|
to_trio.send_nowait(await coro)
|
||||||
|
|
||||||
|
|
||||||
|
async def consume_asyncgen(
|
||||||
|
to_trio: trio.MemorySendChannel,
|
||||||
|
coro: AsyncIterator,
|
||||||
|
) -> None:
|
||||||
|
"""Stream async generator results back to ``trio``.
|
||||||
|
|
||||||
|
``from_trio`` might eventually be used here for
|
||||||
|
bidirectional streaming.
|
||||||
|
"""
|
||||||
|
async for item in coro:
|
||||||
|
to_trio.send_nowait(item)
|
||||||
|
|
||||||
|
|
||||||
|
def _run_asyncio_task(
|
||||||
|
func: Callable,
|
||||||
|
*,
|
||||||
|
qsize: int = 1,
|
||||||
|
_treat_as_stream: bool = False,
|
||||||
|
**kwargs,
|
||||||
|
) -> Any:
|
||||||
|
"""Run an ``asyncio`` async function or generator in a task, return
|
||||||
|
or stream the result back to ``trio``.
|
||||||
|
|
||||||
|
"""
|
||||||
|
assert current_actor().is_infected_aio()
|
||||||
|
|
||||||
|
# ITC (inter task comms)
|
||||||
|
from_trio = asyncio.Queue(qsize) # type: ignore
|
||||||
|
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
|
||||||
|
|
||||||
|
from_aio._err = None
|
||||||
|
|
||||||
|
args = tuple(inspect.getfullargspec(func).args)
|
||||||
|
|
||||||
|
if getattr(func, '_tractor_steam_function', None):
|
||||||
|
# the assumption is that the target async routine accepts the
|
||||||
|
# send channel then it intends to yield more then one return
|
||||||
|
# value otherwise it would just return ;P
|
||||||
|
# _treat_as_stream = True
|
||||||
|
assert qsize > 1
|
||||||
|
|
||||||
|
# allow target func to accept/stream results manually by name
|
||||||
|
if 'to_trio' in args:
|
||||||
|
kwargs['to_trio'] = to_trio
|
||||||
|
|
||||||
|
if 'from_trio' in args:
|
||||||
|
kwargs['from_trio'] = from_trio
|
||||||
|
|
||||||
|
# if 'from_aio' in args:
|
||||||
|
# kwargs['from_aio'] = from_aio
|
||||||
|
|
||||||
|
coro = func(**kwargs)
|
||||||
|
|
||||||
|
cancel_scope = trio.CancelScope()
|
||||||
|
|
||||||
|
# start the asyncio task we submitted from trio
|
||||||
|
if inspect.isawaitable(coro):
|
||||||
|
task = asyncio.create_task(run_coro(to_trio, coro))
|
||||||
|
|
||||||
|
elif inspect.isasyncgen(coro):
|
||||||
|
task = asyncio.create_task(consume_asyncgen(to_trio, coro))
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise TypeError(f"No support for invoking {coro}")
|
||||||
|
|
||||||
|
aio_err = None
|
||||||
|
|
||||||
|
def cancel_trio(task):
|
||||||
|
"""Cancel the calling ``trio`` task on error.
|
||||||
|
"""
|
||||||
|
nonlocal aio_err
|
||||||
|
try:
|
||||||
|
aio_err = task.exception()
|
||||||
|
except asyncio.CancelledError as cerr:
|
||||||
|
aio_err = cerr
|
||||||
|
|
||||||
|
if aio_err:
|
||||||
|
log.exception(f"asyncio task errorred:\n{aio_err}")
|
||||||
|
|
||||||
|
cancel_scope.cancel()
|
||||||
|
from_aio._err = aio_err
|
||||||
|
from_aio.close()
|
||||||
|
|
||||||
|
task.add_done_callback(cancel_trio)
|
||||||
|
|
||||||
|
return task, from_aio, to_trio, cancel_scope
|
||||||
|
|
||||||
|
|
||||||
|
async def run_task(
|
||||||
|
|
||||||
|
func: Callable,
|
||||||
|
*,
|
||||||
|
|
||||||
|
qsize: int = 2**10,
|
||||||
|
_treat_as_stream: bool = False,
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> Any:
|
||||||
|
"""Run an ``asyncio`` async function or generator in a task, return
|
||||||
|
or stream the result back to ``trio``.
|
||||||
|
|
||||||
|
"""
|
||||||
|
# streaming ``asyncio`` task
|
||||||
|
if _treat_as_stream:
|
||||||
|
|
||||||
|
task, from_aio, to_trio, cs = _run_asyncio_task(
|
||||||
|
func,
|
||||||
|
qsize=2**8,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
# naively expect the mem chan api to do the job
|
||||||
|
# of handling cross-framework cancellations / errors
|
||||||
|
return from_aio
|
||||||
|
|
||||||
|
# simple async func
|
||||||
|
try:
|
||||||
|
task, from_aio, to_trio, cs = _run_asyncio_task(
|
||||||
|
func,
|
||||||
|
qsize=1,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
# return single value
|
||||||
|
with cs:
|
||||||
|
return await from_aio.receive()
|
||||||
|
|
||||||
|
except trio.Cancelled:
|
||||||
|
if not task.done():
|
||||||
|
task.cancel()
|
||||||
|
raise
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if from_aio._err:
|
||||||
|
raise from_aio._err
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: explicit api for the streaming case where
|
||||||
|
# we pull from the mem chan in an async generator?
|
||||||
|
# This ends up looking more like our ``Portal.open_stream_from()``
|
||||||
|
# NB: code below is untested.
|
||||||
|
|
||||||
|
# @asynccontextmanager
|
||||||
|
# async def stream_from_task(
|
||||||
|
|
||||||
|
# target: Callable[Any],
|
||||||
|
# **kwargs,
|
||||||
|
|
||||||
|
# ) -> AsyncIterator[Any]:
|
||||||
|
|
||||||
|
# from_aoi = await run_task(target, _treat_as_stream=True, **kwargs)
|
||||||
|
|
||||||
|
# with cancel_scope:
|
||||||
|
# # stream values upward
|
||||||
|
# async with from_aio:
|
||||||
|
# async for item in from_aio:
|
||||||
|
# yield item
|
||||||
|
|
||||||
|
|
||||||
|
def run_as_asyncio_guest(
|
||||||
|
trio_main: Callable,
|
||||||
|
) -> None:
|
||||||
|
"""Entry for an "infected ``asyncio`` actor".
|
||||||
|
|
||||||
|
Uh, oh. :o
|
||||||
|
|
||||||
|
It looks like your event loop has caught a case of the ``trio``s.
|
||||||
|
|
||||||
|
:()
|
||||||
|
|
||||||
|
Don't worry, we've heard you'll barely notice. You might hallucinate
|
||||||
|
a few more propagating errors and feel like your digestion has
|
||||||
|
slowed but if anything get's too bad your parents will know about
|
||||||
|
it.
|
||||||
|
|
||||||
|
:)
|
||||||
|
"""
|
||||||
|
async def aio_main(trio_main):
|
||||||
|
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
trio_done_fut = asyncio.Future()
|
||||||
|
|
||||||
|
def trio_done_callback(main_outcome):
|
||||||
|
|
||||||
|
log.info(f"trio_main finished: {main_outcome!r}")
|
||||||
|
trio_done_fut.set_result(main_outcome)
|
||||||
|
|
||||||
|
# start the infection: run trio on the asyncio loop in "guest mode"
|
||||||
|
log.info(f"Infecting asyncio process with {trio_main}")
|
||||||
|
|
||||||
|
trio.lowlevel.start_guest_run(
|
||||||
|
trio_main,
|
||||||
|
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||||
|
done_callback=trio_done_callback,
|
||||||
|
)
|
||||||
|
|
||||||
|
(await trio_done_fut).unwrap()
|
||||||
|
|
||||||
|
# might as well if it's installed.
|
||||||
|
try:
|
||||||
|
import uvloop
|
||||||
|
loop = uvloop.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
asyncio.run(aio_main(trio_main))
|
Loading…
Reference in New Issue