forked from goodboy/tractor
1
0
Fork 0

Merge pull request #52 from tgoodlet/contexts

Contexts and Pub-Sub
docs_example_fixes
goodboy 2019-01-25 00:49:07 -05:00 committed by GitHub
commit 977eaedb0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 865 additions and 228 deletions

View File

@ -2,7 +2,7 @@ language: python
matrix: matrix:
include: include:
- python: 3.6 # - python: 3.6
- python: 3.7 - python: 3.7
dist: xenial dist: xenial
sudo: required sudo: required

View File

@ -42,6 +42,9 @@ down. A great place to start is the `trio docs`_ and this `blog post`_.
.. _modern async Python: https://www.python.org/dev/peps/pep-0525/ .. _modern async Python: https://www.python.org/dev/peps/pep-0525/
.. contents::
Philosophy Philosophy
---------- ----------
``tractor``'s tenets non-comprehensively include: ``tractor``'s tenets non-comprehensively include:
@ -71,8 +74,12 @@ No PyPi release yet!
pip install git+git://github.com/tgoodlet/tractor.git pip install git+git://github.com/tgoodlet/tractor.git
Examples
--------
A trynamic first scene A trynamic first scene
---------------------- **********************
Let's direct a couple *actors* and have them run their lines for Let's direct a couple *actors* and have them run their lines for
the hip new film we're shooting: the hip new film we're shooting:
@ -104,6 +111,7 @@ the hip new film we're shooting:
donny = await n.run_in_actor( donny = await n.run_in_actor(
'donny', 'donny',
say_hello, say_hello,
# arguments are always named
other_actor='gretchen', other_actor='gretchen',
) )
gretchen = await n.run_in_actor( gretchen = await n.run_in_actor(
@ -129,7 +137,7 @@ this case our "director" executing ``main()``).
Actor spawning and causality Actor spawning and causality
---------------------------- ****************************
``tractor`` tries to take ``trio``'s concept of causal task lifetimes ``tractor`` tries to take ``trio``'s concept of causal task lifetimes
to multi-process land. Accordingly, ``tractor``'s *actor nursery* behaves to multi-process land. Accordingly, ``tractor``'s *actor nursery* behaves
similar to ``trio``'s nursery_. That is, ``tractor.open_nursery()`` similar to ``trio``'s nursery_. That is, ``tractor.open_nursery()``
@ -155,7 +163,7 @@ and use the ``run_in_actor()`` method:
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.run_in_actor('frank', movie_theatre_question) portal = await n.run_in_actor('teacher', cellar_door)
# The ``async with`` will unblock here since the 'frank' # The ``async with`` will unblock here since the 'frank'
# actor has completed its main task ``movie_theatre_question()``. # actor has completed its main task ``movie_theatre_question()``.
@ -186,7 +194,7 @@ What's going on?
much like you'd expect from a future_. much like you'd expect from a future_.
This ``run_in_actor()`` API should look very familiar to users of This ``run_in_actor()`` API should look very familiar to users of
``asyncio``'s run_in_executor_ which uses a ``concurrent.futures`` Executor_. ``asyncio``'s `run_in_executor()`_ which uses a ``concurrent.futures`` Executor_.
Since you might also want to spawn long running *worker* or *daemon* Since you might also want to spawn long running *worker* or *daemon*
actors, each actor's *lifetime* can be determined based on the spawn actors, each actor's *lifetime* can be determined based on the spawn
@ -199,7 +207,7 @@ method:
- actors can be spawned to *live forever* using the ``start_actor()`` - actors can be spawned to *live forever* using the ``start_actor()``
method and act like an RPC daemon that runs indefinitely (the method and act like an RPC daemon that runs indefinitely (the
``with tractor.open_nursery()`` wont' exit) until cancelled_ ``with tractor.open_nursery()`` won't exit) until cancelled_
Had we wanted the latter form in our example it would have looked like: Had we wanted the latter form in our example it would have looked like:
@ -251,12 +259,12 @@ to all others with ease over standard network protocols).
.. _nursery: https://trio.readthedocs.io/en/latest/reference-core.html#nurseries-and-spawning .. _nursery: https://trio.readthedocs.io/en/latest/reference-core.html#nurseries-and-spawning
.. _causal: https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#causality .. _causal: https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#causality
.. _cancelled: https://trio.readthedocs.io/en/latest/reference-core.html#child-tasks-and-cancellation .. _cancelled: https://trio.readthedocs.io/en/latest/reference-core.html#child-tasks-and-cancellation
.. _run_in_executor: https://docs.python.org/3/library/asyncio-eventloop.html#executor .. _run_in_executor(): https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
.. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor .. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor
Transparent remote function calling using *portals* Async IPC using *portals*
--------------------------------------------------- *************************
``tractor`` introduces the concept of a *portal* which is an API ``tractor`` introduces the concept of a *portal* which is an API
borrowed_ from ``trio``. A portal may seem similar to the idea of borrowed_ from ``trio``. A portal may seem similar to the idea of
a RPC future_ except a *portal* allows invoking remote *async* functions and a RPC future_ except a *portal* allows invoking remote *async* functions and
@ -322,6 +330,9 @@ generator function running in a separate actor:
tractor.run(main) tractor.run(main)
A full fledged streaming service
********************************
Alright, let's get fancy. Alright, let's get fancy.
Say you wanted to spawn two actors which each pull data feeds from Say you wanted to spawn two actors which each pull data feeds from
@ -443,7 +454,7 @@ as ``multiprocessing`` calls it) which is running ``main()``.
Cancellation Cancellation
------------ ************
``tractor`` supports ``trio``'s cancellation_ system verbatim. ``tractor`` supports ``trio``'s cancellation_ system verbatim.
Cancelling a nursery block cancels all actors spawned by it. Cancelling a nursery block cancels all actors spawned by it.
Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``. Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``.
@ -452,7 +463,7 @@ Eventually ``tractor`` plans to support different `supervision strategies`_ like
Remote error propagation Remote error propagation
------------------------ ************************
Any task invoked in a remote actor should ship any error(s) back to the calling Any task invoked in a remote actor should ship any error(s) back to the calling
actor where it is raised and expected to be dealt with. This way remote actors actor where it is raised and expected to be dealt with. This way remote actors
are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself. are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself.
@ -494,12 +505,12 @@ a ``Supervisor`` type.
.. _erlang strategies: http://learnyousomeerlang.com/supervisors .. _erlang strategies: http://learnyousomeerlang.com/supervisors
Shared task state Actor local variables
----------------- *********************
Although ``tractor`` uses a *shared-nothing* architecture between processes Although ``tractor`` uses a *shared-nothing* architecture between processes
you can of course share state within an actor. ``trio`` tasks spawned via you can of course share state between tasks running *within* an actor.
multiple RPC calls to an actor can access global data using the per actor ``trio`` tasks spawned via multiple RPC calls to an actor can access global
``statespace`` dictionary: state using the per actor ``statespace`` dictionary:
.. code:: python .. code:: python
@ -528,7 +539,7 @@ out a state sharing system per-actor is totally up to you.
How do actors find each other (a poor man's *service discovery*)? How do actors find each other (a poor man's *service discovery*)?
----------------------------------------------------------------- *****************************************************************
Though it will be built out much more in the near future, ``tractor`` Though it will be built out much more in the near future, ``tractor``
currently keeps track of actors by ``(name: str, id: str)`` using a currently keeps track of actors by ``(name: str, id: str)`` using a
special actor called the *arbiter*. Currently the *arbiter* must exist special actor called the *arbiter*. Currently the *arbiter* must exist
@ -561,20 +572,67 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as
*first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``.
Using ``Channel`` directly (undocumented) Streaming using channels and contexts
----------------------------------------- *************************************
You can use the ``Channel`` api if necessary by simply defining a ``Channel`` is the API which wraps an underlying *transport* and *interchange*
``chan`` and ``cid`` *kwarg* in your async function definition. format to enable *inter-actor-communication*. In its present state ``tractor``
``tractor`` will treat such async functions like async generators on uses TCP and msgpack_.
the calling side (for now anyway) such that you can push stream values
a little more granularly if you find *yielding* values to be restrictive. If you aren't fond of having to write an async generator to stream data
I am purposely not documenting this feature with code because I'm not yet between actors (or need something more flexible) you can instead use a
sure yet how it should be used correctly. If you'd like more details ``Context``. A context wraps an actor-local spawned task and a ``Channel``
please feel free to ask me on the `trio gitter channel`_. so that tasks executing across multiple processes can stream data
to one another using a low level, request oriented API.
As an example if you wanted to create a streaming server without writing
an async generator that *yields* values you instead define an async
function:
.. code:: python
async def streamer(ctx, rate=2):
"""A simple web response streaming server.
"""
while True:
val = await web_request('http://data.feed.com')
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
Running actors standalone (without spawning) All that's required is declaring a ``ctx`` argument name somewhere in
-------------------------------------------- your function signature and ``tractor`` will treat the async function
like an async generator - as a streaming function from the client side.
This turns out to be handy particularly if you have
multiple tasks streaming responses concurrently:
.. code:: python
async def streamer(ctx, url, rate=2):
"""A simple web response streaming server.
"""
while True:
val = await web_request(url)
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
async def stream_multiple_sources(ctx, sources):
async with trio.open_nursery() as n:
for url in sources:
n.start_soon(streamer, ctx, url)
The context notion comes from the context_ in nanomsg_.
Running actors standalone
*************************
You don't have to spawn any actors using ``open_nursery()`` if you just You don't have to spawn any actors using ``open_nursery()`` if you just
want to run a single actor that connects to an existing cluster. want to run a single actor that connects to an existing cluster.
All the comms and arbiter registration stuff still works. This can All the comms and arbiter registration stuff still works. This can
@ -588,7 +646,7 @@ need to hop into a debugger. You just need to pass the existing
Enabling logging Enabling logging
---------------- ****************
Considering how complicated distributed software can become it helps to know Considering how complicated distributed software can become it helps to know
what exactly it's doing (even at the lowest levels). Luckily ``tractor`` has what exactly it's doing (even at the lowest levels). Luckily ``tractor`` has
tons of logging throughout the core. ``tractor`` isn't opinionated on tons of logging throughout the core. ``tractor`` isn't opinionated on
@ -616,12 +674,20 @@ Stuff I'd like to see ``tractor`` do real soon:
- an extensive `chaos engineering`_ test suite - an extensive `chaos engineering`_ test suite
- support for reactive programming primitives and native support for asyncitertools_ like libs - support for reactive programming primitives and native support for asyncitertools_ like libs
If you're interested in tackling any of these please do shout about it on the
`trio gitter channel`_! Feel like saying hi?
--------------------
This project is very much coupled to the ongoing development of
``trio`` (i.e. ``tractor`` gets all its ideas from that brilliant
community). If you want to help, have suggestions or just want to
say hi, please feel free to ping me on the `trio gitter channel`_!
.. _supervisors: https://github.com/tgoodlet/tractor/issues/22 .. _supervisors: https://github.com/tgoodlet/tractor/issues/22
.. _nanomsg: https://github.com/tgoodlet/tractor/issues/19 .. _nanomsg: https://nanomsg.github.io/nng/index.html
.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5
.. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol .. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol
.. _trio gitter channel: https://gitter.im/python-trio/general .. _trio gitter channel: https://gitter.im/python-trio/general
.. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html .. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html
.. _pdb++: https://github.com/antocuni/pdb .. _pdb++: https://github.com/antocuni/pdb
.. _msgpack: https://en.wikipedia.org/wiki/MessagePack

View File

@ -37,7 +37,8 @@ setup(
'tractor', 'tractor',
'tractor.testing', 'tractor.testing',
], ],
install_requires=['msgpack', 'trio>0.8', 'async_generator', 'colorlog'], install_requires=[
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt'],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.6", python_requires=">=3.6",
keywords=[ keywords=[

View File

@ -7,7 +7,6 @@ import pytest
import trio import trio
import tractor import tractor
from conftest import tractor_test from conftest import tractor_test

View File

@ -0,0 +1,187 @@
import time
from itertools import cycle
import pytest
import trio
import tractor
from async_generator import aclosing
from tractor.testing import tractor_test
def test_type_checks():
with pytest.raises(TypeError) as err:
@tractor.msg.pub
async def no_get_topics(yo):
yield
assert "must define a `get_topics`" in str(err.value)
with pytest.raises(TypeError) as err:
@tractor.msg.pub
def not_async_gen(yo):
pass
assert "must be an async generator function" in str(err.value)
def is_even(i):
return i % 2 == 0
@tractor.msg.pub
async def pubber(get_topics):
ss = tractor.current_actor().statespace
for i in cycle(range(10)):
# ensure topic subscriptions are as expected
ss['get_topics'] = get_topics
yield {'even' if is_even(i) else 'odd': i}
await trio.sleep(0.1)
async def subs(which, pub_actor_name):
if len(which) == 1:
if which[0] == 'even':
pred = is_even
else:
def pred(i):
return not is_even(i)
else:
def pred(i):
return isinstance(i, int)
async with tractor.find_actor(pub_actor_name) as portal:
agen = await portal.run(__name__, 'pubber', topics=which)
async with aclosing(agen) as agen:
async for pkt in agen:
for topic, value in pkt.items():
assert pred(value)
@tractor.msg.pub(tasks=['one', 'two'])
async def multilock_pubber(get_topics):
yield {'doggy': 10}
@pytest.mark.parametrize(
'callwith_expecterror',
[
(pubber, {}, TypeError),
# missing a `topics`
(multilock_pubber, {'ctx': None}, TypeError),
# missing a `task_name`
(multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError),
# should work
(multilock_pubber,
{'ctx': None, 'topics': ['topic1'], 'task_name': 'one'},
None),
],
)
@tractor_test
async def test_required_args(callwith_expecterror):
func, kwargs, err = callwith_expecterror
if err is not None:
with pytest.raises(err):
await func(**kwargs)
else:
async with tractor.open_nursery() as n:
# await func(**kwargs)
portal = await n.run_in_actor(
'sub', multilock_pubber, **kwargs)
async for val in await portal.result():
assert val == {'doggy': 10}
@pytest.mark.parametrize(
'pub_actor',
['streamer', 'arbiter']
)
def test_pubsub_multi_actor_subs(
loglevel,
arb_addr,
pub_actor,
):
"""Try out the neato @pub decorator system.
"""
async def main():
ss = tractor.current_actor().statespace
async with tractor.open_nursery() as n:
name = 'arbiter'
if pub_actor is 'streamer':
# start the publisher as a daemon
master_portal = await n.start_actor(
'streamer',
rpc_module_paths=[__name__],
)
even_portal = await n.run_in_actor(
'evens', subs, which=['even'], pub_actor_name=name)
odd_portal = await n.run_in_actor(
'odds', subs, which=['odd'], pub_actor_name=name)
async with tractor.wait_for_actor('evens'):
# block until 2nd actor is initialized
pass
if pub_actor is 'arbiter':
# wait for publisher task to be spawned in a local RPC task
while not ss.get('get_topics'):
await trio.sleep(0.1)
get_topics = ss.get('get_topics')
assert 'even' in get_topics()
async with tractor.wait_for_actor('odds'):
# block until 2nd actor is initialized
pass
if pub_actor is 'arbiter':
start = time.time()
while 'odd' not in get_topics():
await trio.sleep(0.1)
if time.time() - start > 1:
pytest.fail("odds subscription never arrived?")
# TODO: how to make this work when the arbiter gets
# a portal to itself? Currently this causes a hang
# when the channel server is torn down due to a lingering
# loopback channel
# with trio.move_on_after(1):
# await subs(['even', 'odd'])
# XXX: this would cause infinite
# blocking due to actor never terminating loop
# await even_portal.result()
await trio.sleep(0.5)
await even_portal.cancel_actor()
await trio.sleep(0.5)
if pub_actor is 'arbiter':
assert 'even' not in get_topics()
await odd_portal.cancel_actor()
await trio.sleep(1)
if pub_actor is 'arbiter':
while get_topics():
await trio.sleep(0.1)
if time.time() - start > 1:
pytest.fail("odds subscription never dropped?")
else:
await master_portal.cancel_actor()
tractor.run(
main,
arbiter_addr=arb_addr,
rpc_module_paths=[__name__],
)

View File

@ -1,6 +1,8 @@
""" """
RPC related RPC related
""" """
import itertools
import pytest import pytest
import tractor import tractor
import trio import trio
@ -12,6 +14,7 @@ async def sleep_back_actor(
func_defined, func_defined,
exposed_mods, exposed_mods,
): ):
if actor_name:
async with tractor.find_actor(actor_name) as portal: async with tractor.find_actor(actor_name) as portal:
try: try:
await portal.run(__name__, func_name) await portal.run(__name__, func_name)
@ -23,6 +26,8 @@ async def sleep_back_actor(
assert err.type is expect assert err.type is expect
raise raise
else:
await trio.sleep(float('inf'))
async def short_sleep(): async def short_sleep():
@ -31,19 +36,40 @@ async def short_sleep():
@pytest.mark.parametrize( @pytest.mark.parametrize(
'to_call', [ 'to_call', [
([], 'short_sleep'), ([], 'short_sleep', tractor.RemoteActorError),
([__name__], 'short_sleep'), ([__name__], 'short_sleep', tractor.RemoteActorError),
([__name__], 'fake_func'), ([__name__], 'fake_func', tractor.RemoteActorError),
(['tmp_mod'], 'import doggy', ModuleNotFoundError),
(['tmp_mod'], '4doggy', SyntaxError),
], ],
ids=['no_mods', 'this_mod', 'this_mod_bad_func'], ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
'fail_on_syntax'],
) )
def test_rpc_errors(arb_addr, to_call): def test_rpc_errors(arb_addr, to_call, testdir):
"""Test errors when making various RPC requests to an actor """Test errors when making various RPC requests to an actor
that either doesn't have the requested module exposed or doesn't define that either doesn't have the requested module exposed or doesn't define
the named function. the named function.
""" """
exposed_mods, funcname = to_call exposed_mods, funcname, inside_err = to_call
subactor_exposed_mods = []
func_defined = globals().get(funcname, False) func_defined = globals().get(funcname, False)
subactor_requests_to = 'arbiter'
remote_err = tractor.RemoteActorError
# remote module that fails at import time
if exposed_mods == ['tmp_mod']:
# create an importable module with a bad import
testdir.syspathinsert()
# module should cause raise a ModuleNotFoundError at import
testdir.makefile('.py', tmp_mod=funcname)
# no need to exposed module to the subactor
subactor_exposed_mods = exposed_mods
exposed_mods = []
func_defined = False
# subactor should not try to invoke anything
subactor_requests_to = None
remote_err = trio.MultiError
async def main(): async def main():
actor = tractor.current_actor() actor = tractor.current_actor()
@ -54,12 +80,13 @@ def test_rpc_errors(arb_addr, to_call):
await n.run_in_actor( await n.run_in_actor(
'subactor', 'subactor',
sleep_back_actor, sleep_back_actor,
actor_name=actor.name, actor_name=subactor_requests_to,
# function from this module the subactor will invoke # function from the local exposed module space
# when it RPCs back to this actor # the subactor will invoke when it RPCs back to this actor
func_name=funcname, func_name=funcname,
exposed_mods=exposed_mods, exposed_mods=exposed_mods,
func_defined=True if func_defined else False, func_defined=True if func_defined else False,
rpc_module_paths=subactor_exposed_mods,
) )
def run(): def run():
@ -73,8 +100,18 @@ def test_rpc_errors(arb_addr, to_call):
if exposed_mods and func_defined: if exposed_mods and func_defined:
run() run()
else: else:
# underlying errors are propogated upwards (yet) # underlying errors are propagated upwards (yet)
with pytest.raises(tractor.RemoteActorError) as err: with pytest.raises(remote_err) as err:
run() run()
assert err.value.type is tractor.RemoteActorError # get raw instance from pytest wrapper
value = err.value
# might get multiple `trio.Cancelled`s as well inside an inception
if isinstance(value, trio.MultiError):
value = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError),
value.exceptions
))
assert value.type is inside_err

View File

@ -11,13 +11,14 @@ import trio # type: ignore
from trio import MultiError from trio import MultiError
from .log import get_console_log, get_logger, get_loglevel from .log import get_console_log, get_logger, get_loglevel
from ._ipc import _connect_chan, Channel from ._ipc import _connect_chan, Channel, Context
from ._actor import ( from ._actor import (
Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor
) )
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor
from ._exceptions import RemoteActorError, ModuleNotExposed from ._exceptions import RemoteActorError, ModuleNotExposed
from . import msg
__all__ = [ __all__ = [
@ -30,6 +31,7 @@ __all__ = [
'MultiError', 'MultiError',
'RemoteActorError', 'RemoteActorError',
'ModuleNotExposed', 'ModuleNotExposed',
'msg'
] ]

View File

@ -13,9 +13,13 @@ from typing import Dict, List, Tuple, Any, Optional, Union
import trio # type: ignore import trio # type: ignore
from async_generator import asynccontextmanager, aclosing from async_generator import asynccontextmanager, aclosing
from ._ipc import Channel, _connect_chan from ._ipc import Channel, _connect_chan, Context
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from ._exceptions import pack_error, InternalActorError, ModuleNotExposed from ._exceptions import (
pack_error,
unpack_error,
ModuleNotExposed
)
from ._portal import ( from ._portal import (
Portal, Portal,
open_portal, open_portal,
@ -46,15 +50,13 @@ async def _invoke(
sig = inspect.signature(func) sig = inspect.signature(func)
treat_as_gen = False treat_as_gen = False
cs = None cs = None
if 'chan' in sig.parameters: ctx = Context(chan, cid)
assert 'cid' in sig.parameters, \ if 'ctx' in sig.parameters:
f"{func} must accept a `cid` (caller id) kwarg" kwargs['ctx'] = ctx
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent # TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator. # about what is considered a far-end async-generator.
# Right now both actual async gens and any async # Right now both actual async gens and any async
# function which declares a `chan` kwarg in its # function which declares a `ctx` kwarg in its
# signature will be treated as one. # signature will be treated as one.
treat_as_gen = True treat_as_gen = True
try: try:
@ -138,19 +140,15 @@ async def _invoke(
task_status.started(err) task_status.started(err)
finally: finally:
# RPC task bookeeping # RPC task bookeeping
tasks = actor._rpc_tasks.get(chan, None)
if tasks:
try: try:
scope, func = tasks.pop(cid) scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
except ValueError: is_complete.set()
except KeyError:
# If we're cancelled before the task returns then the # If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
log.warn( log.warn(
f"Task {func} was likely cancelled before it was started") f"Task {func} was likely cancelled before it was started")
if not tasks:
actor._rpc_tasks.pop(chan, None)
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.info(f"All RPC tasks have completed") log.info(f"All RPC tasks have completed")
actor._no_more_rpc_tasks.set() actor._no_more_rpc_tasks.set()
@ -195,9 +193,10 @@ class Actor:
self._no_more_rpc_tasks = trio.Event() self._no_more_rpc_tasks = trio.Event()
self._no_more_rpc_tasks.set() self._no_more_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: Dict[ self._rpc_tasks: Dict[
Channel, Tuple[Channel, str],
Dict[str, Tuple[trio._core._run.CancelScope, typing.Callable]] Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event]
] = {} ] = {}
# map {uids -> {callids -> waiter queues}} # map {uids -> {callids -> waiter queues}}
self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {}
@ -225,17 +224,9 @@ class Actor:
code (if it exists). code (if it exists).
""" """
for path in self.rpc_module_paths: for path in self.rpc_module_paths:
log.debug(f"Attempting to import {path}")
self._mods[path] = importlib.import_module(path) self._mods[path] = importlib.import_module(path)
# XXX: triggers an internal error which can cause a hanging
# problem (without the recently added .throw()) on teardown
# (root nursery tears down thus killing all channels before
# sending cancels to subactors during actor nursery teardown
# - has to do with await main() in MainProcess)
# if self.name == 'gretchen':
# self._mods.pop('test_discovery')
# TODO: how to test the above?
def _get_rpc_func(self, ns, funcname): def _get_rpc_func(self, ns, funcname):
try: try:
return getattr(self._mods[ns], funcname) return getattr(self._mods[ns], funcname)
@ -274,7 +265,7 @@ class Actor:
log.warning( log.warning(
f"already have channel(s) for {uid}:{chans}?" f"already have channel(s) for {uid}:{chans}?"
) )
log.debug(f"Registered {chan} for {uid}") log.trace(f"Registered {chan} for {uid}") # type: ignore
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
@ -301,10 +292,12 @@ class Actor:
if chan.connected(): if chan.connected():
log.debug(f"Disconnecting channel {chan}") log.debug(f"Disconnecting channel {chan}")
try: try:
# send our msg loop terminate sentinel
await chan.send(None) await chan.send(None)
await chan.aclose() # await chan.aclose()
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.exception(f"Channel for {chan.uid} was already zonked..") log.exception(
f"Channel for {chan.uid} was already zonked..")
async def _push_result(self, actorid, cid: str, msg: dict) -> None: async def _push_result(self, actorid, cid: str, msg: dict) -> None:
"""Push an RPC result to the local consumer's queue. """Push an RPC result to the local consumer's queue.
@ -339,7 +332,10 @@ class Actor:
return cid, q return cid, q
async def _process_messages( async def _process_messages(
self, chan: Channel, treat_as_gen: bool = False self, chan: Channel,
treat_as_gen: bool = False,
shield: bool = False,
task_status=trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Process messages for the channel async-RPC style. """Process messages for the channel async-RPC style.
@ -347,33 +343,30 @@ class Actor:
""" """
# TODO: once https://github.com/python-trio/trio/issues/467 gets # TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that! # worked out we'll likely want to use that!
msg = None
log.debug(f"Entering msg loop for {chan} from {chan.uid}") log.debug(f"Entering msg loop for {chan} from {chan.uid}")
try: try:
# internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
# a locally spawned task)
with trio.open_cancel_scope(shield=shield) as cs:
task_status.started(cs)
async for msg in chan: async for msg in chan:
if msg is None: # terminate sentinel if msg is None: # loop terminate sentinel
log.debug( log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}") f"Cancelling all tasks for {chan} from {chan.uid}")
for cid, (scope, func) in self._rpc_tasks.pop( for (channel, cid) in self._rpc_tasks:
chan, {} if channel is chan:
).items(): self.cancel_task(cid, Context(channel, cid))
scope.cancel()
log.debug( log.debug(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")
break break
log.debug(f"Received msg {msg} from {chan.uid}") log.debug(f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid') cid = msg.get('cid')
if cid: if cid:
cancel = msg.get('cancel')
if cancel:
# right now this is only implicitly used by
# async generator IPC
scope, func = self._rpc_tasks[chan][cid]
log.debug(
f"Received cancel request for task {cid}"
f" from {chan.uid}")
scope.cancel()
else:
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._push_result(chan.uid, cid, msg) await self._push_result(chan.uid, cid, msg)
log.debug( log.debug(
@ -389,12 +382,8 @@ class Actor:
# (i.e. no cid was provided in the msg - see above). # (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers # Push this error to all local channel consumers
# (normally portals) by marking the channel as errored # (normally portals) by marking the channel as errored
tb_str = msg.get('tb_str')
assert chan.uid assert chan.uid
exc = InternalActorError( exc = unpack_error(msg, chan=chan)
f"{chan.uid}\n" + tb_str,
**msg,
)
chan._exc = exc chan._exc = exc
raise exc raise exc
@ -423,14 +412,16 @@ class Actor:
# deadlock and other weird behaviour) # deadlock and other weird behaviour)
if func != self.cancel: if func != self.cancel:
if isinstance(cs, Exception): if isinstance(cs, Exception):
log.warn(f"Task for RPC func {func} failed with {cs}") log.warn(f"Task for RPC func {func} failed with"
f"{cs}")
else: else:
# mark that we have ongoing rpc tasks # mark that we have ongoing rpc tasks
self._no_more_rpc_tasks.clear() self._no_more_rpc_tasks.clear()
log.info(f"RPC func is {func}") log.info(f"RPC func is {func}")
# store cancel scope such that the rpc task can be # store cancel scope such that the rpc task can be
# cancelled gracefully if requested # cancelled gracefully if requested
self._rpc_tasks.setdefault(chan, {})[cid] = (cs, func) self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
log.debug( log.debug(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
else: else:
@ -448,8 +439,14 @@ class Actor:
raise raise
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
except trio.Cancelled:
# debugging only
log.debug("Msg loop was cancelled")
raise
finally: finally:
log.debug(f"Exiting msg loop for {chan} from {chan.uid}") log.debug(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
def _fork_main( def _fork_main(
self, self,
@ -462,6 +459,8 @@ class Actor:
self._forkserver_info = forkserver_info self._forkserver_info = forkserver_info
from ._trionics import ctx from ._trionics import ctx
if self.loglevel is not None: if self.loglevel is not None:
log.info(
f"Setting loglevel for {self.uid} to {self.loglevel}")
get_console_log(self.loglevel) get_console_log(self.loglevel)
log.info( log.info(
f"Started new {ctx.current_process()} for {self.uid}") f"Started new {ctx.current_process()} for {self.uid}")
@ -493,9 +492,6 @@ class Actor:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
self._root_nursery = nursery self._root_nursery = nursery
# load allowed RPC module
self.load_modules()
# Startup up channel server # Startup up channel server
host, port = accept_addr host, port = accept_addr
await nursery.start(partial( await nursery.start(partial(
@ -524,6 +520,11 @@ class Actor:
nursery.start_soon( nursery.start_soon(
self._process_messages, self._parent_chan) self._process_messages, self._parent_chan)
# load exposed/allowed RPC modules
# XXX: do this **after** establishing connection to parent
# so that import errors are properly propagated upwards
self.load_modules()
# register with the arbiter if we're told its addr # register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`") log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal: async with get_arbiter(*arbiter_addr) as arb_portal:
@ -546,8 +547,7 @@ class Actor:
if self._parent_chan: if self._parent_chan:
try: try:
# internal error so ship to parent without cid # internal error so ship to parent without cid
await self._parent_chan.send( await self._parent_chan.send(pack_error(err))
pack_error(err))
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error( log.error(
f"Failed to ship error to parent " f"Failed to ship error to parent "
@ -632,18 +632,46 @@ class Actor:
self.cancel_server() self.cancel_server()
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
async def cancel_task(self, cid, ctx):
"""Cancel a local task.
Note this method will be treated as a streaming funciton
by remote actor-callers due to the declaration of ``ctx``
in the signature (for now).
"""
# right now this is only implicitly called by
# streaming IPC but it should be called
# to cancel any remotely spawned task
chan = ctx.chan
# the ``dict.get()`` ensures the requested task to be cancelled
# was indeed spawned by a request from this channel
scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)]
log.debug(
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
# don't allow cancelling this function mid-execution
# (is this necessary?)
if func is self.cancel_task:
return
scope.cancel()
# wait for _invoke to mark the task complete
await is_complete.wait()
log.debug(
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
async def cancel_rpc_tasks(self) -> None: async def cancel_rpc_tasks(self) -> None:
"""Cancel all existing RPC responder tasks using the cancel scope """Cancel all existing RPC responder tasks using the cancel scope
registered for each. registered for each.
""" """
tasks = self._rpc_tasks tasks = self._rpc_tasks
log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for chan, cids2scopes in tasks.items(): for (chan, cid) in tasks.copy():
log.debug(f"Cancelling all tasks for {chan.uid}") # TODO: this should really done in a nursery batch
for cid, (scope, func) in cids2scopes.items(): await self.cancel_task(cid, Context(chan, cid))
log.debug(f"Cancelling task for {func}") # if tasks:
scope.cancel()
if tasks:
log.info( log.info(
f"Waiting for remaining rpc tasks to complete {tasks}") f"Waiting for remaining rpc tasks to complete {tasks}")
await self._no_more_rpc_tasks.wait() await self._no_more_rpc_tasks.wait()
@ -815,7 +843,9 @@ async def find_actor(
sockaddr = await arb_portal.run('self', 'find_actor', name=name) sockaddr = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if sockaddr: if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
elif sockaddr:
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal

View File

@ -39,7 +39,7 @@ class NoResult(RuntimeError):
"No final result is expected for this actor" "No final result is expected for this actor"
class ModuleNotExposed(RuntimeError): class ModuleNotExposed(ModuleNotFoundError):
"The requested module is not exposed for RPC" "The requested module is not exposed for RPC"
@ -55,12 +55,12 @@ def pack_error(exc):
} }
def unpack_error(msg, chan=None): def unpack_error(msg, chan=None, err_type=RemoteActorError):
"""Unpack an 'error' message from the wire """Unpack an 'error' message from the wire
into a local ``RemoteActorError``. into a local ``RemoteActorError``.
""" """
tb_str = msg['error'].get('tb_str', '') tb_str = msg['error'].get('tb_str', '')
return RemoteActorError( return err_type(
f"{chan.uid}\n" + tb_str, f"{chan.uid}\n" + tb_str,
**msg['error'], **msg['error'],
) )

View File

@ -1,6 +1,7 @@
""" """
Inter-process comms abstractions Inter-process comms abstractions
""" """
from dataclasses import dataclass
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional
@ -205,6 +206,28 @@ class Channel:
return self.squeue.connected() if self.squeue else False return self.squeue.connected() if self.squeue else False
@dataclass(frozen=True)
class Context:
"""An IAC (inter-actor communication) context.
Allows maintaining task or protocol specific state between communicating
actors. A unique context is created on the receiving end for every request
to a remote actor.
"""
chan: Channel
cid: str
# TODO: we should probably attach the actor-task
# cancel scope here now that trio is exposing it
# as a public object
async def send_yield(self, data: Any) -> None:
await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
@asynccontextmanager @asynccontextmanager
async def _connect_chan( async def _connect_chan(
host: str, port: int host: str, port: int

View File

@ -5,9 +5,11 @@ import importlib
import inspect import inspect
import typing import typing
from typing import Tuple, Any, Dict, Optional, Set from typing import Tuple, Any, Dict, Optional, Set
from functools import partial
from dataclasses import dataclass
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager, aclosing
from ._state import current_actor from ._state import current_actor
from ._ipc import Channel from ._ipc import Channel
@ -53,7 +55,7 @@ class Portal:
underlying ``tractor.Channel`` as though the remote (async) underlying ``tractor.Channel`` as though the remote (async)
function / generator was invoked locally. function / generator was invoked locally.
Think of this like an native async IPC API. Think of this like a native async IPC API.
""" """
def __init__(self, channel: Channel) -> None: def __init__(self, channel: Channel) -> None:
self.channel = channel self.channel = channel
@ -124,7 +126,7 @@ class Portal:
# to make async-generators the fundamental IPC API over channels! # to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff) # (think `yield from`, `gen.send()`, and functional reactive stuff)
if resptype == 'yield': if resptype == 'yield': # stream response
async def yield_from_q(): async def yield_from_q():
try: try:
@ -140,12 +142,26 @@ class Portal:
"Received internal error at portal?") "Received internal error at portal?")
raise unpack_error(msg, self.channel) raise unpack_error(msg, self.channel)
except GeneratorExit: except (GeneratorExit, trio.Cancelled):
# for now this msg cancels an ongoing remote task log.warning(
await self.channel.send({'cancel': True, 'cid': cid})
log.warn(
f"Cancelling async gen call {cid} to " f"Cancelling async gen call {cid} to "
f"{self.channel.uid}") f"{self.channel.uid}")
with trio.move_on_after(0.5) as cs:
cs.shield = True
# TODO: yeah.. it'd be nice if this was just an
# async func on the far end. Gotta figure out a
# better way then implicitly feeding the ctx
# to declaring functions; likely a decorator
# sytsem.
agen = await self.run('self', 'cancel_task', cid=cid)
async with aclosing(agen) as agen:
async for _ in agen:
pass
if cs.cancelled_caught:
if not self.channel.connected():
log.warning(
"May have failed to cancel remote task "
f"{cid} for {self.channel.uid}")
raise raise
# TODO: use AsyncExitStack to aclose() all agens # TODO: use AsyncExitStack to aclose() all agens
@ -154,7 +170,7 @@ class Portal:
self._agens.add(agen) self._agens.add(agen)
return agen return agen
elif resptype == 'return': elif resptype == 'return': # single response
msg = await q.get() msg = await q.get()
try: try:
return msg['return'] return msg['return']
@ -176,7 +192,7 @@ class Portal:
# not expecting a "main" result # not expecting a "main" result
if self._expect_result is None: if self._expect_result is None:
log.warn( log.warning(
f"Portal for {self.channel.uid} not expecting a final" f"Portal for {self.channel.uid} not expecting a final"
" result?\nresult() should only be called if subactor" " result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`") " was spawned with `ActorNursery.run_in_actor()`")
@ -198,44 +214,53 @@ class Portal:
return self._result return self._result
async def _cancel_streams(self):
# terminate all locally running async generator
# IPC calls
if self._agens:
log.warning(
f"Cancelling all streams with {self.channel.uid}")
for agen in self._agens:
await agen.aclose()
async def close(self) -> None: async def close(self) -> None:
# trigger remote msg loop `break` await self._cancel_streams()
chan = self.channel
log.debug(f"Closing portal for {chan} to {chan.uid}")
await self.channel.send(None)
async def cancel_actor(self) -> bool: async def cancel_actor(self) -> bool:
"""Cancel the actor on the other end of this portal. """Cancel the actor on the other end of this portal.
""" """
if not self.channel.connected():
log.warning("This portal is already closed can't cancel")
return False
await self._cancel_streams()
log.warning( log.warning(
f"Sending cancel request to {self.channel.uid} on " f"Sending actor cancel request to {self.channel.uid} on "
f"{self.channel}") f"{self.channel}")
try: try:
with trio.move_on_after(0.1) as cancel_scope:
cancel_scope.shield = True
# send cancel cmd - might not get response # send cancel cmd - might not get response
with trio.move_on_after(0.5) as cancel_scope:
cancel_scope.shield = True
await self.run('self', 'cancel') await self.run('self', 'cancel')
return True return True
if cancel_scope.cancelled_caught:
log.warning(f"May have failed to cancel {self.channel.uid}")
return False
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.warning( log.warning(
f"{self.channel} for {self.channel.uid} was already closed?") f"{self.channel} for {self.channel.uid} was already closed?")
return False return False
else:
log.warning(f"May have failed to cancel {self.channel.uid}")
return False
@dataclass
class LocalPortal: class LocalPortal:
"""A 'portal' to a local ``Actor``. """A 'portal' to a local ``Actor``.
A compatibility shim for normal portals but for invoking functions A compatibility shim for normal portals but for invoking functions
using an in process actor instance. using an in process actor instance.
""" """
def __init__(
self,
actor: 'Actor' # type: ignore actor: 'Actor' # type: ignore
) -> None:
self.actor = actor
async def run(self, ns: str, func_name: str, **kwargs) -> Any: async def run(self, ns: str, func_name: str, **kwargs) -> Any:
"""Run a requested function locally and return it's result. """Run a requested function locally and return it's result.
@ -270,20 +295,26 @@ async def open_portal(
if channel.uid is None: if channel.uid is None:
await _do_handshake(actor, channel) await _do_handshake(actor, channel)
nursery.start_soon(actor._process_messages, channel) msg_loop_cs = await nursery.start(
partial(
actor._process_messages,
channel,
# if the local task is cancelled we want to keep
# the msg loop running until our block ends
shield=True,
)
)
portal = Portal(channel) portal = Portal(channel)
try: try:
yield portal yield portal
finally: finally:
# tear down all async generators
for agen in portal._agens:
await agen.aclose()
# cancel remote channel-msg loop
if channel.connected():
await portal.close() await portal.close()
# cancel background msg loop task
nursery.cancel_scope.cancel()
if was_connected: if was_connected:
await channel.aclose() # cancel remote channel-msg loop
await channel.send(None)
# cancel background msg loop task
msg_loop_cs.cancel()
nursery.cancel_scope.cancel()

View File

@ -114,7 +114,7 @@ class ActorNursery:
name: str, name: str,
fn: typing.Callable, fn: typing.Callable,
bind_addr: Tuple[str, int] = ('127.0.0.1', 0), bind_addr: Tuple[str, int] = ('127.0.0.1', 0),
rpc_module_paths: List[str] = None, rpc_module_paths: List[str] = [],
statespace: Dict[str, Any] = None, statespace: Dict[str, Any] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
**kwargs, # explicit args to ``fn`` **kwargs, # explicit args to ``fn``
@ -129,7 +129,7 @@ class ActorNursery:
mod_path = fn.__module__ mod_path = fn.__module__
portal = await self.start_actor( portal = await self.start_actor(
name, name,
rpc_module_paths=[mod_path], rpc_module_paths=[mod_path] + rpc_module_paths,
bind_addr=bind_addr, bind_addr=bind_addr,
statespace=statespace, statespace=statespace,
) )
@ -317,7 +317,7 @@ class ActorNursery:
# the `else:` block here might not complete? # the `else:` block here might not complete?
# For now, shield both. # For now, shield both.
with trio.open_cancel_scope(shield=True): with trio.open_cancel_scope(shield=True):
if etype is trio.Cancelled: if etype in (trio.Cancelled, KeyboardInterrupt):
log.warning( log.warning(
f"Nursery for {current_actor().uid} was " f"Nursery for {current_actor().uid} was "
f"cancelled with {etype}") f"cancelled with {etype}")

261
tractor/msg.py 100644
View File

@ -0,0 +1,261 @@
"""
Messaging pattern APIs and helpers.
"""
import inspect
import typing
from typing import Dict, Any, Set, Union, Callable
from functools import partial
from async_generator import aclosing
import trio
import wrapt
from .log import get_logger
from . import current_actor
from ._ipc import Context
__all__ = ['pub']
log = get_logger('messaging')
async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: Dict[str, set],
packetizer: typing.Callable = None,
) -> None:
"""Request and fan out quotes to each subscribed actor channel.
"""
def get_topics():
return tuple(topics2ctxs.keys())
agen = pub_async_gen_func(get_topics=get_topics)
async with aclosing(agen) as pub_gen:
async for published in pub_gen:
ctx_payloads: Dict[str, Any] = {}
for topic, data in published.items():
log.debug(f"publishing {topic, data}")
# build a new dict packet or invoke provided packetizer
if packetizer is None:
packet = {topic: data}
else:
packet = packetizer(topic, data)
for ctx in topics2ctxs.get(topic, set()):
ctx_payloads.setdefault(ctx, {}).update(packet),
# deliver to each subscriber (fan out)
if ctx_payloads:
for ctx, payload in ctx_payloads.items():
try:
await ctx.send_yield(payload)
except (
# That's right, anything you can think of...
trio.ClosedStreamError, ConnectionResetError,
ConnectionRefusedError,
):
log.warning(f"{ctx.chan} went down?")
for ctx_set in topics2ctxs.values():
ctx_set.discard(ctx)
if not get_topics():
log.warning(f"No subscribers left for {pub_gen}")
break
def modify_subs(topics2ctxs, topics, ctx):
"""Absolute symbol subscription list for each quote stream.
Effectively a symbol subscription api.
"""
log.info(f"{ctx.chan.uid} changed subscription to {topics}")
# update map from each symbol to requesting client's chan
for topic in topics:
topics2ctxs.setdefault(topic, set()).add(ctx)
# remove any existing symbol subscriptions if symbol is not
# found in ``symbols``
# TODO: this can likely be factored out into the pub-sub api
for topic in filter(
lambda topic: topic not in topics, topics2ctxs.copy()
):
ctx_set = topics2ctxs.get(topic)
ctx_set.discard(ctx)
if not ctx_set:
# pop empty sets which will trigger bg quoter task termination
topics2ctxs.pop(topic)
def pub(
wrapped: typing.Callable = None,
*,
tasks: Set[str] = set(),
):
"""Publisher async generator decorator.
A publisher can be called multiple times from different actors
but will only spawn a finite set of internal tasks to stream values
to each caller. The ``tasks` argument to the decorator (``Set[str]``)
specifies the names of the mutex set of publisher tasks.
When the publisher function is called, an argument ``task_name`` must be
passed to specify which task (of the set named in ``tasks``) should be
used. This allows for using the same publisher with different input
(arguments) without allowing more concurrent tasks then necessary.
Values yielded from the decorated async generator
must be ``Dict[str, Dict[str, Any]]`` where the fist level key is the
topic string an determines which subscription the packet will be delivered
to and the value is a packet ``Dict[str, Any]`` by default of the form:
.. ::python
{topic: value}
The caller can instead opt to pass a ``packetizer`` callback who's return
value will be delivered as the published response.
The decorated function must *accept* an argument :func:`get_topics` which
dynamically returns the tuple of current subscriber topics:
.. code:: python
from tractor.msg import pub
@pub(tasks={'source_1', 'source_2'})
async def pub_service(get_topics):
data = await web_request(endpoints=get_topics())
for item in data:
yield data['key'], data
The publisher must be called passing in the following arguments:
- ``topics: Set[str]`` the topic sequence or "subscriptions"
- ``task_name: str`` the task to use (if ``tasks`` was passed)
- ``ctx: Context`` the tractor context (only needed if calling the
pub func without a nursery, otherwise this is provided implicitly)
- packetizer: ``Callable[[str, Any], Any]`` a callback who receives
the topic and value from the publisher function each ``yield`` such that
whatever is returned is sent as the published value to subscribers of
that topic. By default this is a dict ``{topic: value}``.
As an example, to make a subscriber call the above function:
.. code:: python
from functools import partial
import tractor
async with tractor.open_nursery() as n:
portal = n.run_in_actor(
'publisher', # actor name
partial( # func to execute in it
pub_service,
topics=('clicks', 'users'),
task_name='source1',
)
)
async for value in portal.result():
print(f"Subscriber received {value}")
Here, you don't need to provide the ``ctx`` argument since the remote actor
provides it automatically to the spawned task. If you were to call
``pub_service()`` directly from a wrapping function you would need to
provide this explicitly.
Remember you only need this if you need *a finite set of tasks* running in
a single actor to stream data to an arbitrary number of subscribers. If you
are ok to have a new task running for every call to ``pub_service()`` then
probably don't need this.
"""
# handle the decorator not called with () case
if wrapped is None:
return partial(pub, tasks=tasks)
task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {
None: trio.StrictFIFOLock()}
for name in tasks:
task2lock[name] = trio.StrictFIFOLock()
async def takes_ctx(get_topics, ctx=None):
pass
@wrapt.decorator(adapter=takes_ctx)
async def wrapper(agen, instance, args, kwargs):
# this is used to extract arguments properly as per
# the `wrapt` docs
async def _execute(
ctx: Context,
topics: Set[str],
*args,
# *,
task_name: str = None,
packetizer: Callable = None,
**kwargs,
):
if tasks and task_name is None:
raise TypeError(
f"{agen} must be called with a `task_name` named "
f"argument with a falue from {tasks}")
ss = current_actor().statespace
lockmap = ss.setdefault('_pubtask2lock', task2lock)
lock = lockmap[task_name]
all_subs = ss.setdefault('_subs', {})
topics2ctxs = all_subs.setdefault(task_name, {})
try:
modify_subs(topics2ctxs, topics, ctx)
# block and let existing feed task deliver
# stream data until it is cancelled in which case
# the next waiting task will take over and spawn it again
async with lock:
# no data feeder task yet; so start one
respawn = True
while respawn:
respawn = False
log.info(
f"Spawning data feed task for {funcname}")
try:
# unblocks when no more symbols subscriptions exist
# and the streamer task terminates
await fan_out_to_ctxs(
pub_async_gen_func=partial(
agen, *args, **kwargs),
topics2ctxs=topics2ctxs,
packetizer=packetizer,
)
log.info(
f"Terminating stream task {task_name or ''}"
f" for {agen.__name__}")
except trio.BrokenResourceError:
log.exception("Respawning failed data feed task")
respawn = True
finally:
# remove all subs for this context
modify_subs(topics2ctxs, (), ctx)
# if there are truly no more subscriptions with this broker
# drop from broker subs dict
if not any(topics2ctxs.values()):
log.info(
f"No more subscriptions for publisher {task_name}")
# invoke it
await _execute(*args, **kwargs)
funcname = wrapped.__name__
if not inspect.isasyncgenfunction(wrapped):
raise TypeError(
f"Publisher {funcname} must be an async generator function"
)
if 'get_topics' not in inspect.signature(wrapped).parameters:
raise TypeError(
f"Publisher async gen {funcname} must define a "
"`get_topics` argument"
)
return wrapper(wrapped)