Merge pull request #73 from goodboy/stream_functions

Stream functions
user_update
goodboy 2019-03-29 19:41:50 -04:00 committed by GitHub
commit e0f4894071
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 433 additions and 322 deletions

View File

@ -280,8 +280,60 @@ to all others with ease over standard network protocols).
.. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor .. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor
Async IPC using *portals* Cancellation
************************* ************
``tractor`` supports ``trio``'s cancellation_ system verbatim.
Cancelling a nursery block cancels all actors spawned by it.
Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``.
.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags
Remote error propagation
************************
Any task invoked in a remote actor should ship any error(s) back to the calling
actor where it is raised and expected to be dealt with. This way remote actors
are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself.
.. code:: python
async def assert_err():
assert 0
async def main():
async with tractor.open_nursery() as n:
real_actors = []
for i in range(3):
real_actors.append(await n.start_actor(
f'actor_{i}',
rpc_module_paths=[__name__],
))
# start one actor that will fail immediately
await n.run_in_actor('extra', assert_err)
# should error here with a ``RemoteActorError`` containing
# an ``AssertionError`` and all the other actors have been cancelled
try:
# also raises
tractor.run(main)
except tractor.RemoteActorError:
print("Look Maa that actor failed hard, hehhh!")
You'll notice the nursery cancellation conducts a *one-cancels-all*
supervisory strategy `exactly like trio`_. The plan is to add more
`erlang strategies`_ in the near future by allowing nurseries to accept
a ``Supervisor`` type.
.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics
.. _erlang strategies: http://learnyousomeerlang.com/supervisors
IPC using *portals*
*******************
``tractor`` introduces the concept of a *portal* which is an API ``tractor`` introduces the concept of a *portal* which is an API
borrowed_ from ``trio``. A portal may seem similar to the idea of borrowed_ from ``trio``. A portal may seem similar to the idea of
a RPC future_ except a *portal* allows invoking remote *async* functions and a RPC future_ except a *portal* allows invoking remote *async* functions and
@ -305,10 +357,26 @@ channels_ system or shipping code over the network.
This *portal* approach turns out to be paricularly exciting with the This *portal* approach turns out to be paricularly exciting with the
introduction of `asynchronous generators`_ in Python 3.6! It means that introduction of `asynchronous generators`_ in Python 3.6! It means that
actors can compose nicely in a data processing pipeline. actors can compose nicely in a data streaming pipeline.
As an example here's an actor that streams for 1 second from a remote async
generator function running in a separate actor: Streaming
*********
By now you've figured out that ``tractor`` lets you spawn process based
*actors* that can invoke cross-process (async) functions and all with
structured concurrency built in. But the **real cool stuff** is the
native support for cross-process *streaming*.
Asynchronous generators
+++++++++++++++++++++++
The default streaming function is simply an async generator definition.
Every value *yielded* from the generator is delivered to the calling
portal exactly like if you had invoked the function in-process meaning
you can ``async for`` to receive each value on the calling side.
As an example here's a parent actor that streams for 1 second from a
spawned subactor:
.. code:: python .. code:: python
@ -346,10 +414,87 @@ generator function running in a separate actor:
tractor.run(main) tractor.run(main)
By default async generator functions are treated as inter-actor
*streams* when invoked via a portal (how else could you really interface
with them anyway) so no special syntax to denote the streaming *service*
is necessary.
Channels and Contexts
+++++++++++++++++++++
If you aren't fond of having to write an async generator to stream data
between actors (or need something more flexible) you can instead use
a ``Context``. A context wraps an actor-local spawned task and
a ``Channel`` so that tasks executing across multiple processes can
stream data to one another using a low level, request oriented API.
A ``Channel`` wraps an underlying *transport* and *interchange* format
to enable *inter-actor-communication*. In its present state ``tractor``
uses TCP and msgpack_.
As an example if you wanted to create a streaming server without writing
an async generator that *yields* values you instead define a decorated
async function:
.. code:: python
@tractor.stream
async def streamer(ctx: tractor.Context, rate: int = 2) -> None:
"""A simple web response streaming server.
"""
while True:
val = await web_request('http://data.feed.com')
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
You must decorate the function with ``@tractor.stream`` and declare
a ``ctx`` argument as the first in your function signature and then
``tractor`` will treat the async function like an async generator - as
a stream from the calling/client side.
This turns out to be handy particularly if you have multiple tasks
pushing responses concurrently:
.. code:: python
async def streamer(
ctx: tractor.Context,
rate: int = 2
) -> None:
"""A simple web response streaming server.
"""
while True:
val = await web_request(url)
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
@tractor.stream
async def stream_multiple_sources(
ctx: tractor.Context,
sources: List[str]
) -> None:
async with trio.open_nursery() as n:
for url in sources:
n.start_soon(streamer, ctx, url)
The context notion comes from the context_ in nanomsg_.
.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5
.. _msgpack: https://en.wikipedia.org/wiki/MessagePack
A full fledged streaming service A full fledged streaming service
******************************** ++++++++++++++++++++++++++++++++
Alright, let's get fancy. Alright, let's get fancy.
Say you wanted to spawn two actors which each pull data feeds from Say you wanted to spawn two actors which each pull data feeds from
@ -471,58 +616,6 @@ as ``multiprocessing`` calls it) which is running ``main()``.
.. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i
Cancellation
************
``tractor`` supports ``trio``'s cancellation_ system verbatim.
Cancelling a nursery block cancels all actors spawned by it.
Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``.
.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags
Remote error propagation
************************
Any task invoked in a remote actor should ship any error(s) back to the calling
actor where it is raised and expected to be dealt with. This way remote actors
are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself.
.. code:: python
async def assert_err():
assert 0
async def main():
async with tractor.open_nursery() as n:
real_actors = []
for i in range(3):
real_actors.append(await n.start_actor(
f'actor_{i}',
rpc_module_paths=[__name__],
))
# start one actor that will fail immediately
await n.run_in_actor('extra', assert_err)
# should error here with a ``RemoteActorError`` containing
# an ``AssertionError`` and all the other actors have been cancelled
try:
# also raises
tractor.run(main)
except tractor.RemoteActorError:
print("Look Maa that actor failed hard, hehhh!")
You'll notice the nursery cancellation conducts a *one-cancels-all*
supervisory strategy `exactly like trio`_. The plan is to add more
`erlang strategies`_ in the near future by allowing nurseries to accept
a ``Supervisor`` type.
.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics
.. _erlang strategies: http://learnyousomeerlang.com/supervisors
Actor local variables Actor local variables
********************* *********************
Although ``tractor`` uses a *shared-nothing* architecture between processes Although ``tractor`` uses a *shared-nothing* architecture between processes
@ -556,8 +649,8 @@ a convenience for passing simple data to newly spawned actors); building
out a state sharing system per-actor is totally up to you. out a state sharing system per-actor is totally up to you.
How do actors find each other (a poor man's *service discovery*)? Service Discovery
***************************************************************** *****************
Though it will be built out much more in the near future, ``tractor`` Though it will be built out much more in the near future, ``tractor``
currently keeps track of actors by ``(name: str, id: str)`` using a currently keeps track of actors by ``(name: str, id: str)`` using a
special actor called the *arbiter*. Currently the *arbiter* must exist special actor called the *arbiter*. Currently the *arbiter* must exist
@ -590,70 +683,6 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as
*first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``.
Streaming using channels and contexts
*************************************
``Channel`` is the API which wraps an underlying *transport* and *interchange*
format to enable *inter-actor-communication*. In its present state ``tractor``
uses TCP and msgpack_.
If you aren't fond of having to write an async generator to stream data
between actors (or need something more flexible) you can instead use a
``Context``. A context wraps an actor-local spawned task and a ``Channel``
so that tasks executing across multiple processes can stream data
to one another using a low level, request oriented API.
As an example if you wanted to create a streaming server without writing
an async generator that *yields* values you instead define an async
function:
.. code:: python
async def streamer(ctx: tractor.Context, rate: int = 2) -> None:
"""A simple web response streaming server.
"""
while True:
val = await web_request('http://data.feed.com')
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
All that's required is declaring a ``ctx`` argument name somewhere in
your function signature and ``tractor`` will treat the async function
like an async generator - as a streaming function from the client side.
This turns out to be handy particularly if you have
multiple tasks streaming responses concurrently:
.. code:: python
async def streamer(ctx: tractor.Context, rate: int = 2) -> None:
"""A simple web response streaming server.
"""
while True:
val = await web_request(url)
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
async def stream_multiple_sources(
ctx: tractor.Context, sources: List[str]
) -> None:
async with trio.open_nursery() as n:
for url in sources:
n.start_soon(streamer, ctx, url)
The context notion comes from the context_ in nanomsg_.
.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5
.. _msgpack: https://en.wikipedia.org/wiki/MessagePack
Running actors standalone Running actors standalone
************************* *************************
You don't have to spawn any actors using ``open_nursery()`` if you just You don't have to spawn any actors using ``open_nursery()`` if you just

View File

@ -2,13 +2,29 @@
Streaming via async gen api Streaming via async gen api
""" """
import time import time
from functools import partial
import trio import trio
import tractor import tractor
import pytest import pytest
async def stream_seq(sequence):
def test_must_define_ctx():
with pytest.raises(TypeError) as err:
@tractor.stream
async def no_ctx():
pass
assert "no_ctx must be `ctx: tractor.Context" in str(err.value)
@tractor.stream
async def no_ctx(ctx):
pass
async def async_gen_stream(sequence):
for i in sequence: for i in sequence:
yield i yield i
await trio.sleep(0.1) await trio.sleep(0.1)
@ -20,10 +36,23 @@ async def stream_seq(sequence):
assert cs.cancelled_caught assert cs.cancelled_caught
async def stream_from_single_subactor(): @tractor.stream
async def context_stream(ctx, sequence):
for i in sequence:
await ctx.send_yield(i)
await trio.sleep(0.1)
# block indefinitely waiting to be cancelled by ``aclose()`` call
with trio.CancelScope() as cs:
await trio.sleep(float('inf'))
assert 0
assert cs.cancelled_caught
async def stream_from_single_subactor(stream_func_name):
"""Verify we can spawn a daemon actor and retrieve streamed data. """Verify we can spawn a daemon actor and retrieve streamed data.
""" """
async with tractor.find_actor('brokerd') as portals: async with tractor.find_actor('streamerd') as portals:
if not portals: if not portals:
# only one per host address, spawns an actor if None # only one per host address, spawns an actor if None
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
@ -36,37 +65,43 @@ async def stream_from_single_subactor():
seq = range(10) seq = range(10)
agen = await portal.run( stream = await portal.run(
__name__, __name__,
'stream_seq', # the func above stream_func_name, # one of the funcs above
sequence=list(seq), # has to be msgpack serializable sequence=list(seq), # has to be msgpack serializable
) )
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...
iseq = iter(seq) iseq = iter(seq)
ival = next(iseq) ival = next(iseq)
async for val in agen: async for val in stream:
assert val == ival assert val == ival
try: try:
ival = next(iseq) ival = next(iseq)
except StopIteration: except StopIteration:
# should cancel far end task which will be # should cancel far end task which will be
# caught and no error is raised # caught and no error is raised
await agen.aclose() await stream.aclose()
await trio.sleep(0.3) await trio.sleep(0.3)
try: try:
await agen.__anext__() await stream.__anext__()
except StopAsyncIteration: except StopAsyncIteration:
# stop all spawned subactors # stop all spawned subactors
await portal.cancel_actor() await portal.cancel_actor()
# await nursery.cancel() # await nursery.cancel()
def test_stream_from_single_subactor(arb_addr, start_method): @pytest.mark.parametrize(
'stream_func', ['async_gen_stream', 'context_stream']
)
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator. """Verify streaming from a spawned async generator.
""" """
tractor.run( tractor.run(
partial(
stream_from_single_subactor, stream_from_single_subactor,
stream_func_name=stream_func,
),
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
start_method=start_method, start_method=start_method,
) )

View File

@ -11,10 +11,10 @@ import trio # type: ignore
from trio import MultiError from trio import MultiError
from . import log from . import log
from ._ipc import _connect_chan, Channel, Context from ._ipc import _connect_chan, Channel
from ._actor import ( from ._streaming import Context, stream
Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor from ._discovery import get_arbiter, find_actor, wait_for_actor
) from ._actor import Actor, _start_actor, Arbiter
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor
from ._exceptions import RemoteActorError, ModuleNotExposed from ._exceptions import RemoteActorError, ModuleNotExposed
@ -30,6 +30,7 @@ __all__ = [
'wait_for_actor', 'wait_for_actor',
'Channel', 'Channel',
'Context', 'Context',
'stream',
'MultiError', 'MultiError',
'RemoteActorError', 'RemoteActorError',
'ModuleNotExposed', 'ModuleNotExposed',

View File

@ -8,26 +8,22 @@ import importlib
import inspect import inspect
import uuid import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional, Union from typing import Dict, List, Tuple, Any, Optional
import trio # type: ignore import trio # type: ignore
from async_generator import asynccontextmanager, aclosing from async_generator import aclosing
from ._ipc import Channel, _connect_chan, Context from ._ipc import Channel
from ._streaming import Context, _context
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from ._exceptions import ( from ._exceptions import (
pack_error, pack_error,
unpack_error, unpack_error,
ModuleNotExposed ModuleNotExposed
) )
from ._portal import ( from ._discovery import get_arbiter
Portal, from ._portal import Portal
open_portal,
_do_handshake,
LocalPortal,
)
from . import _state from . import _state
from ._state import current_actor
log = get_logger('tractor') log = get_logger('tractor')
@ -45,19 +41,16 @@ async def _invoke(
kwargs: Dict[str, Any], kwargs: Dict[str, Any],
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
"""Invoke local func and return results over provided channel. """Invoke local func and deliver result(s) over provided channel.
""" """
sig = inspect.signature(func)
treat_as_gen = False treat_as_gen = False
cs = None cs = None
ctx = Context(chan, cid) cancel_scope = trio.CancelScope()
if 'ctx' in sig.parameters: ctx = Context(chan, cid, cancel_scope)
_context.set(ctx)
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `ctx` kwarg in its
# signature will be treated as one.
treat_as_gen = True treat_as_gen = True
try: try:
is_async_partial = False is_async_partial = False
@ -73,7 +66,7 @@ async def _invoke(
not is_async_gen_partial not is_async_gen_partial
): ):
await chan.send({'functype': 'function', 'cid': cid}) await chan.send({'functype': 'function', 'cid': cid})
with trio.CancelScope() as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
await chan.send({'return': func(**kwargs), 'cid': cid}) await chan.send({'return': func(**kwargs), 'cid': cid})
else: else:
@ -88,7 +81,7 @@ async def _invoke(
# have to properly handle the closing (aclosing) # have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel # of the async gen in order to be sure the cancel
# is propagated! # is propagated!
with trio.CancelScope() as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
async with aclosing(coro) as agen: async with aclosing(coro) as agen:
async for item in agen: async for item in agen:
@ -113,7 +106,7 @@ async def _invoke(
# back values like an async-generator would but must # back values like an async-generator would but must
# manualy construct the response dict-packet-responses as # manualy construct the response dict-packet-responses as
# above # above
with trio.CancelScope() as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
await coro await coro
if not cs.cancelled_caught: if not cs.cancelled_caught:
@ -122,7 +115,7 @@ async def _invoke(
await chan.send({'stop': True, 'cid': cid}) await chan.send({'stop': True, 'cid': cid})
else: else:
await chan.send({'functype': 'asyncfunction', 'cid': cid}) await chan.send({'functype': 'asyncfunction', 'cid': cid})
with trio.CancelScope() as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except Exception as err: except Exception as err:
@ -174,7 +167,7 @@ class Actor:
arbiter_addr: Optional[Tuple[str, int]] = None, arbiter_addr: Optional[Tuple[str, int]] = None,
) -> None: ) -> None:
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid1())) self.uid = (name, uid or str(uuid.uuid4()))
self.rpc_module_paths = rpc_module_paths self.rpc_module_paths = rpc_module_paths
self._mods: dict = {} self._mods: dict = {}
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
@ -247,7 +240,7 @@ class Actor:
# send/receive initial handshake response # send/receive initial handshake response
try: try:
uid = await _do_handshake(self, chan) uid = await self._do_handshake(chan)
except StopAsyncIteration: except StopAsyncIteration:
log.warning(f"Channel {chan} failed to handshake") log.warning(f"Channel {chan} failed to handshake")
return return
@ -351,7 +344,7 @@ class Actor:
caller id and a ``trio.Queue`` that can be used to wait for caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop. responses delivered by the local message processing loop.
""" """
cid = str(uuid.uuid1()) cid = str(uuid.uuid4())
assert chan.uid assert chan.uid
recv_chan = self.get_memchans(chan.uid, cid) recv_chan = self.get_memchans(chan.uid, cid)
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
@ -373,11 +366,12 @@ class Actor:
msg = None msg = None
log.debug(f"Entering msg loop for {chan} from {chan.uid}") log.debug(f"Entering msg loop for {chan} from {chan.uid}")
try: try:
# internal scope allows for keeping this message with trio.CancelScope(shield=shield) as cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been # loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from # cancelled (eg. `open_portal()` may call this method from
# a locally spawned task) # a locally spawned task) and recieve this scope using
with trio.CancelScope(shield=shield) as cs: # ``scope = Nursery.start()``
task_status.started(cs) task_status.started(cs)
async for msg in chan: async for msg in chan:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
@ -385,7 +379,7 @@ class Actor:
f"Cancelling all tasks for {chan} from {chan.uid}") f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks: for (channel, cid) in self._rpc_tasks:
if channel is chan: if channel is chan:
self.cancel_task(cid, Context(channel, cid)) self._cancel_task(cid, channel)
log.debug( log.debug(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")
@ -419,6 +413,16 @@ class Actor:
f"{ns}.{funcname}({kwargs})") f"{ns}.{funcname}({kwargs})")
if ns == 'self': if ns == 'self':
func = getattr(self, funcname) func = getattr(self, funcname)
if funcname == '_cancel_task':
# XXX: a special case is made here for
# remote calls since we don't want the
# remote actor have to know which channel
# the task is associated with and we can't
# pass non-primitive types between actors.
# This means you can use:
# Portal.run('self', '_cancel_task, cid=did)
# without passing the `chan` arg.
kwargs['chan'] = chan
else: else:
# complain to client about restricted modules # complain to client about restricted modules
try: try:
@ -537,7 +541,7 @@ class Actor:
) )
await chan.connect() await chan.connect()
# initial handshake, report who we are, who they are # initial handshake, report who we are, who they are
await _do_handshake(self, chan) await self._do_handshake(chan)
except OSError: # failed to connect except OSError: # failed to connect
log.warning( log.warning(
f"Failed to connect to parent @ {parent_addr}," f"Failed to connect to parent @ {parent_addr},"
@ -661,21 +665,20 @@ class Actor:
self.cancel_server() self.cancel_server()
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
async def cancel_task(self, cid, ctx): async def _cancel_task(self, cid, chan):
"""Cancel a local task. """Cancel a local task by call-id / channel.
Note this method will be treated as a streaming funciton Note this method will be treated as a streaming function
by remote actor-callers due to the declaration of ``ctx`` by remote actor-callers due to the declaration of ``ctx``
in the signature (for now). in the signature (for now).
""" """
# right now this is only implicitly called by # right now this is only implicitly called by
# streaming IPC but it should be called # streaming IPC but it should be called
# to cancel any remotely spawned task # to cancel any remotely spawned task
chan = ctx.chan
try: try:
# this ctx based lookup ensures the requested task to # this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from this channel # be cancelled was indeed spawned by a request from this channel
scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] scope, func, is_complete = self._rpc_tasks[(chan, cid)]
except KeyError: except KeyError:
log.warning(f"{cid} has already completed/terminated?") log.warning(f"{cid} has already completed/terminated?")
return return
@ -686,7 +689,7 @@ class Actor:
# don't allow cancelling this function mid-execution # don't allow cancelling this function mid-execution
# (is this necessary?) # (is this necessary?)
if func is self.cancel_task: if func is self._cancel_task:
return return
scope.cancel() scope.cancel()
@ -704,7 +707,7 @@ class Actor:
log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (chan, cid) in tasks.copy(): for (chan, cid) in tasks.copy():
# TODO: this should really done in a nursery batch # TODO: this should really done in a nursery batch
await self.cancel_task(cid, Context(chan, cid)) await self._cancel_task(cid, chan)
# if tasks: # if tasks:
log.info( log.info(
f"Waiting for remaining rpc tasks to complete {tasks}") f"Waiting for remaining rpc tasks to complete {tasks}")
@ -735,6 +738,25 @@ class Actor:
"""Return all channels to the actor with provided uid.""" """Return all channels to the actor with provided uid."""
return self._peers[uid] return self._peers[uid]
async def _do_handshake(
self,
chan: Channel
) -> Tuple[str, str]:
"""Exchange (name, UUIDs) identifiers as the first communication step.
These are essentially the "mailbox addresses" found in actor model
parlance.
"""
await chan.send(self.uid)
uid: Tuple[str, str] = await chan.recv()
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid
class Arbiter(Actor): class Arbiter(Actor):
"""A special actor who knows all the other actors and always has """A special actor who knows all the other actors and always has
@ -840,66 +862,3 @@ async def _start_actor(
log.info("Completed async main") log.info("Completed async main")
return result return result
@asynccontextmanager
async def get_arbiter(
host: str, port: int
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
"""Return a portal instance connected to a local or remote
arbiter.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:
yield arb_portal
@asynccontextmanager
async def find_actor(
name: str, arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Optional[Portal], None]:
"""Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddr = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
elif sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
@asynccontextmanager
async def wait_for_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.
A portal to the first registered actor is returned.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal

View File

@ -0,0 +1,77 @@
"""
Actor discovery API.
"""
import typing
from typing import Tuple, Optional, Union
from async_generator import asynccontextmanager
from ._ipc import _connect_chan
from ._portal import (
Portal,
open_portal,
LocalPortal,
)
from ._state import current_actor
@asynccontextmanager
async def get_arbiter(
host: str, port: int
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
"""Return a portal instance connected to a local or remote
arbiter.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:
yield arb_portal
@asynccontextmanager
async def find_actor(
name: str, arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Optional[Portal], None]:
"""Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddr = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
elif sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
@asynccontextmanager
async def wait_for_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.
A portal to the first registered actor is returned.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal

View File

@ -1,7 +1,6 @@
""" """
Inter-process comms abstractions Inter-process comms abstractions
""" """
from dataclasses import dataclass
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional
@ -205,28 +204,6 @@ class Channel:
return self.msgstream.connected() if self.msgstream else False return self.msgstream.connected() if self.msgstream else False
@dataclass(frozen=True)
class Context:
"""An IAC (inter-actor communication) context.
Allows maintaining task or protocol specific state between communicating
actors. A unique context is created on the receiving end for every request
to a remote actor.
"""
chan: Channel
cid: str
# TODO: we should probably attach the actor-task
# cancel scope here now that trio is exposing it
# as a public object
async def send_yield(self, data: Any) -> None:
await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
@asynccontextmanager @asynccontextmanager
async def _connect_chan( async def _connect_chan(
host: str, port: int host: str, port: int

View File

@ -33,21 +33,6 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
yield nursery yield nursery
async def _do_handshake(
actor: 'Actor', # type: ignore
chan: Channel
) -> Any:
await chan.send(actor.uid)
uid: Tuple[str, str] = await chan.recv()
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid
class StreamReceiveChannel(trio.abc.ReceiveChannel): class StreamReceiveChannel(trio.abc.ReceiveChannel):
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
special behaviour for signalling stream termination across an special behaviour for signalling stream termination across an
@ -95,8 +80,8 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel):
raise unpack_error(msg, self._portal.channel) raise unpack_error(msg, self._portal.channel)
async def aclose(self): async def aclose(self):
"""Cancel associate remote actor task on close """Cancel associated remote actor task and local memory channel
as well as the local memory channel. on close.
""" """
if self._rx_chan._closed: if self._rx_chan._closed:
log.warning(f"{self} is already closed") log.warning(f"{self} is already closed")
@ -107,15 +92,10 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel):
log.warning( log.warning(
f"Cancelling stream {cid} to " f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}") f"{self._portal.channel.uid}")
# TODO: yeah.. it'd be nice if this was just an # NOTE: we're telling the far end actor to cancel a task
# async func on the far end. Gotta figure out a # corresponding to *this actor*. The far end local channel
# better way then implicitly feeding the ctx # instance is passed to `Actor._cancel_task()` implicitly.
# to declaring functions; likely a decorator await self._portal.run('self', '_cancel_task', cid=cid)
# system.
rchan = await self._portal.run(
'self', 'cancel_task', cid=cid)
async for _ in rchan:
pass
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed # XXX: there's no way to know if the remote task was indeed
@ -153,6 +133,7 @@ class Portal:
Tuple[str, Any, str, Dict[str, Any]] Tuple[str, Any, str, Dict[str, Any]]
] = None ] = None
self._streams: Set[StreamReceiveChannel] = set() self._streams: Set[StreamReceiveChannel] = set()
self.actor = current_actor()
async def _submit( async def _submit(
self, self,
@ -167,7 +148,7 @@ class Portal:
This is an async call. This is an async call.
""" """
# ship a function call request to the remote actor # ship a function call request to the remote actor
cid, recv_chan = await current_actor().send_cmd( cid, recv_chan = await self.actor.send_cmd(
self.channel, ns, func, kwargs) self.channel, ns, func, kwargs)
# wait on first response msg and handle (this should be # wait on first response msg and handle (this should be
@ -345,7 +326,7 @@ async def open_portal(
was_connected = True was_connected = True
if channel.uid is None: if channel.uid is None:
await _do_handshake(actor, channel) await actor._do_handshake(channel)
msg_loop_cs = await nursery.start( msg_loop_cs = await nursery.start(
partial( partial(

View File

@ -0,0 +1,49 @@
import inspect
from contextvars import ContextVar
from dataclasses import dataclass
from typing import Any
import trio
from ._ipc import Channel
_context: ContextVar['Context'] = ContextVar('context')
@dataclass(frozen=True)
class Context:
"""An IAC (inter-actor communication) context.
Allows maintaining task or protocol specific state between communicating
actors. A unique context is created on the receiving end for every request
to a remote actor.
"""
chan: Channel
cid: str
cancel_scope: trio.CancelScope
async def send_yield(self, data: Any) -> None:
await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
def current_context():
"""Get the current task's context instance.
"""
return _context.get()
def stream(func):
"""Mark an async function as a streaming routine.
"""
func._tractor_stream_function = True
sig = inspect.signature(func)
if 'ctx' not in sig.parameters:
raise TypeError(
"The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
return func

View File

@ -12,7 +12,7 @@ import wrapt
from .log import get_logger from .log import get_logger
from . import current_actor from . import current_actor
from ._ipc import Context from ._streaming import Context
__all__ = ['pub'] __all__ = ['pub']
@ -97,29 +97,32 @@ def pub(
): ):
"""Publisher async generator decorator. """Publisher async generator decorator.
A publisher can be called multiple times from different actors A publisher can be called multiple times from different actors but
but will only spawn a finite set of internal tasks to stream values will only spawn a finite set of internal tasks to stream values to
to each caller. The ``tasks` argument to the decorator (``Set[str]``) each caller. The ``tasks: Set[str]`` argument to the decorator
specifies the names of the mutex set of publisher tasks. specifies the names of the mutex set of publisher tasks. When the
When the publisher function is called, an argument ``task_name`` must be publisher function is called, an argument ``task_name`` must be
passed to specify which task (of the set named in ``tasks``) should be passed to specify which task (of the set named in ``tasks``) should
used. This allows for using the same publisher with different input be used. This allows for using the same publisher with different
(arguments) without allowing more concurrent tasks then necessary. input (arguments) without allowing more concurrent tasks then
necessary.
Values yielded from the decorated async generator Values yielded from the decorated async generator must be
must be ``Dict[str, Dict[str, Any]]`` where the fist level key is the ``Dict[str, Dict[str, Any]]`` where the fist level key is the topic
topic string an determines which subscription the packet will be delivered string and determines which subscription the packet will be
to and the value is a packet ``Dict[str, Any]`` by default of the form: delivered to and the value is a packet ``Dict[str, Any]`` by default
of the form:
.. ::python .. ::python
{topic: value} {topic: str: value: Any}
The caller can instead opt to pass a ``packetizer`` callback who's return The caller can instead opt to pass a ``packetizer`` callback who's
value will be delivered as the published response. return value will be delivered as the published response.
The decorated function must *accept* an argument :func:`get_topics` which The decorated async generator function must accept an argument
dynamically returns the tuple of current subscriber topics: :func:`get_topics` which dynamically returns the tuple of current
subscriber topics:
.. code:: python .. code:: python
@ -162,15 +165,15 @@ def pub(
print(f"Subscriber received {value}") print(f"Subscriber received {value}")
Here, you don't need to provide the ``ctx`` argument since the remote actor Here, you don't need to provide the ``ctx`` argument since the
provides it automatically to the spawned task. If you were to call remote actor provides it automatically to the spawned task. If you
``pub_service()`` directly from a wrapping function you would need to were to call ``pub_service()`` directly from a wrapping function you
provide this explicitly. would need to provide this explicitly.
Remember you only need this if you need *a finite set of tasks* running in Remember you only need this if you need *a finite set of tasks*
a single actor to stream data to an arbitrary number of subscribers. If you running in a single actor to stream data to an arbitrary number of
are ok to have a new task running for every call to ``pub_service()`` then subscribers. If you are ok to have a new task running for every call
probably don't need this. to ``pub_service()`` then probably don't need this.
""" """
# handle the decorator not called with () case # handle the decorator not called with () case
if wrapped is None: if wrapped is None:
@ -181,10 +184,7 @@ def pub(
for name in tasks: for name in tasks:
task2lock[name] = trio.StrictFIFOLock() task2lock[name] = trio.StrictFIFOLock()
async def takes_ctx(get_topics, ctx=None): @wrapt.decorator
pass
@wrapt.decorator(adapter=takes_ctx)
async def wrapper(agen, instance, args, kwargs): async def wrapper(agen, instance, args, kwargs):
# this is used to extract arguments properly as per # this is used to extract arguments properly as per
# the `wrapt` docs # the `wrapt` docs
@ -249,7 +249,6 @@ def pub(
# invoke it # invoke it
await _execute(*args, **kwargs) await _execute(*args, **kwargs)
funcname = wrapped.__name__ funcname = wrapped.__name__
if not inspect.isasyncgenfunction(wrapped): if not inspect.isasyncgenfunction(wrapped):
raise TypeError( raise TypeError(
@ -261,4 +260,8 @@ def pub(
"`get_topics` argument" "`get_topics` argument"
) )
# XXX: manually monkey the wrapped function since
# ``wrapt.decorator`` doesn't seem to want to play nice with its
# whole "adapter" thing...
wrapped._tractor_stream_function = True # type: ignore
return wrapper(wrapped) return wrapper(wrapped)