More pub decorator improvements

- when calling the async gen func provided by the user wrap it in
  `@async_generator.aclosing` to ensure correct teardown at cancel time
- expect the gen to yield a dict with topic keys and data values
- add a `packetizer` function argument to the api allowing a user
  to format the data to be published in whatever way desired
- support using the decorator without the parentheses (using default
  arguments)
- use a `wrapt` "adapter" to override the signature presented to the
  `_actor._invoke` inspection machinery
- handle the default case where `tasks` isn't provided; allow only one
  concurrent publisher task
- store task locks in an actor local variable
- add a comprehensive doc string
contexts
Tyler Goodlet 2019-01-21 08:35:43 -05:00
parent 97f709cc14
commit b6cc1e8c22
1 changed files with 137 additions and 53 deletions

View File

@ -2,8 +2,9 @@
Messaging pattern APIs and helpers. Messaging pattern APIs and helpers.
""" """
import typing import typing
from typing import Dict, Any from typing import Dict, Any, Sequence
from functools import partial from functools import partial
from async_generator import aclosing
import trio import trio
import wrapt import wrapt
@ -17,44 +18,46 @@ log = get_logger('messaging')
async def fan_out_to_ctxs( async def fan_out_to_ctxs(
pub_gen: typing.Callable, # it's an async gen ... gd mypy pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: Dict[str, set], topics2ctxs: Dict[str, set],
topic_key: str = 'key', packetizer: typing.Callable = None,
) -> None: ) -> None:
"""Request and fan out quotes to each subscribed actor channel. """Request and fan out quotes to each subscribed actor channel.
""" """
def get_topics(): def get_topics():
return tuple(topics2ctxs.keys()) return tuple(topics2ctxs.keys())
async for published in pub_gen( agen = pub_async_gen_func(get_topics=get_topics)
get_topics=get_topics, async with aclosing(agen) as pub_gen:
): async for published in pub_gen:
ctx_payloads: Dict[str, Any] = {} ctx_payloads: Dict[str, Any] = {}
for packet_key, data in published.items(): for topic, data in published.items():
# grab each suscription topic using provided key for lookup log.debug(f"publishing {topic, data}")
topic = data[topic_key] # build a new dict packet or invoke provided packetizer
# build a new dict packet for passing to multiple channels if packetizer is None:
packet = {packet_key: data} packet = {topic: data}
for ctx in topics2ctxs.get(topic, set()): else:
ctx_payloads.setdefault(ctx, {}).update(packet), packet = packetizer(topic, data)
for ctx in topics2ctxs.get(topic, set()):
ctx_payloads.setdefault(ctx, {}).update(packet),
# deliver to each subscriber (fan out) # deliver to each subscriber (fan out)
if ctx_payloads: if ctx_payloads:
for ctx, payload in ctx_payloads.items(): for ctx, payload in ctx_payloads.items():
try: try:
await ctx.send_yield(payload) await ctx.send_yield(payload)
except ( except (
# That's right, anything you can think of... # That's right, anything you can think of...
trio.ClosedStreamError, ConnectionResetError, trio.ClosedStreamError, ConnectionResetError,
ConnectionRefusedError, ConnectionRefusedError,
): ):
log.warn(f"{ctx.chan} went down?") log.warning(f"{ctx.chan} went down?")
for ctx_set in topics2ctxs.values(): for ctx_set in topics2ctxs.values():
ctx_set.discard(ctx) ctx_set.discard(ctx)
if not any(topics2ctxs.values()): if not get_topics():
log.warn(f"No subscribers left for {pub_gen.__name__}") log.warning(f"No subscribers left for {pub_gen}")
break break
def modify_subs(topics2ctxs, topics, ctx): def modify_subs(topics2ctxs, topics, ctx):
@ -62,7 +65,7 @@ def modify_subs(topics2ctxs, topics, ctx):
Effectively a symbol subscription api. Effectively a symbol subscription api.
""" """
log.info(f"{ctx.chan} changed subscription to {topics}") log.info(f"{ctx.chan.uid} changed subscription to {topics}")
# update map from each symbol to requesting client's chan # update map from each symbol to requesting client's chan
for ticker in topics: for ticker in topics:
@ -82,46 +85,127 @@ def modify_subs(topics2ctxs, topics, ctx):
topics2ctxs.pop(ticker) topics2ctxs.pop(ticker)
def pub(*, tasks=()): def pub(
wrapped: typing.Callable = None,
*,
tasks: Sequence[str] = set(),
):
"""Publisher async generator decorator. """Publisher async generator decorator.
A publisher can be called many times from different actor's A publisher can be called multiple times from different actors
remote tasks but will only spawn one internal task to deliver but will only spawn a finite set of internal tasks to stream values
values to all callers. Values yielded from the decorated to each caller. The ``tasks` argument to the decorator (``Set[str]``)
async generator are sent back to each calling task, filtered by specifies the names of the mutex set of publisher tasks.
topic on the producer (server) side. When the publisher function is called, an argument ``task_name`` must be
passed to specify which task (of the set named in ``tasks``) should be
used. This allows for using the same publisher with different input
(arguments) without allowing more concurrent tasks then necessary.
Must be called with a topic as the first arg. Values yielded from the decorated async generator
must be ``Dict[str, Dict[str, Any]]`` where the fist level key is the
topic string an determines which subscription the packet will be delivered
to and the value is a packet ``Dict[str, Any]`` by default of the form:
.. ::python
{topic: value}
The caller can instead opt to pass a ``packetizer`` callback who's return
value will be delivered as the published response.
The decorated function must *accept* an argument :func:`get_topics` which
dynamically returns the tuple of current subscriber topics:
.. code:: python
from tractor.msg import pub
@pub(tasks={'source_1', 'source_2'})
async def pub_service(get_topics):
data = await web_request(endpoints=get_topics())
for item in data:
yield data['key'], data
The publisher must be called passing in the following arguments:
- ``topics: Sequence[str]`` the topic sequence or "subscriptions"
- ``task_name: str`` the task to use (if ``tasks`` was passed)
- ``ctx: Context`` the tractor context (only needed if calling the
pub func without a nursery, otherwise this is provided implicitly)
- packetizer: ``Callable[[str, Any], Any]`` a callback who receives
the topic and value from the publisher function each ``yield`` such that
whatever is returned is sent as the published value to subscribers of
that topic. By default this is a dict ``{topic: value}``.
As an example, to make a subscriber call the above function:
.. code:: python
from functools import partial
import tractor
async with tractor.open_nursery() as n:
portal = n.run_in_actor(
'publisher', # actor name
partial( # func to execute in it
pub_service,
topics=('clicks', 'users'),
task_name='source1',
)
)
async for value in portal.result():
print(f"Subscriber received {value}")
Here, you don't need to provide the ``ctx`` argument since the remote actor
provides it automatically to the spawned task. If you were to call
``pub_service()`` directly from a wrapping function you would need to
provide this explicitly.
Remember you only need this if you need *a finite set of tasks* running in
a single actor to stream data to an arbitrary number of subscribers. If you
are ok to have a new task running for every call to ``pub_service()`` then
probably don't need this.
""" """
task2lock = {} # handle the decorator not called with () case
if wrapped is None:
return partial(pub, tasks=tasks)
task2lock = {None: trio.StrictFIFOLock()}
for name in tasks: for name in tasks:
task2lock[name] = trio.StrictFIFOLock() task2lock[name] = trio.StrictFIFOLock()
@wrapt.decorator async def takes_ctx(get_topics, ctx=None):
pass
@wrapt.decorator(adapter=takes_ctx)
async def wrapper(agen, instance, args, kwargs): async def wrapper(agen, instance, args, kwargs):
task_name = None
if tasks: if tasks:
task_key = kwargs.pop('task_key') try:
if not task_key: task_name = kwargs.pop('task_name')
except KeyError:
raise TypeError( raise TypeError(
f"{agen} must be called with a `task_key` named argument " f"{agen} must be called with a `task_name` named argument "
f"with a falue from {tasks}") f"with a falue from {tasks}")
# pop required kwargs used internally # pop required kwargs used internally
ctx = kwargs.pop('ctx') ctx = kwargs.pop('ctx')
topics = kwargs.pop('topics') topics = kwargs.pop('topics')
topic_key = kwargs.pop('topic_key') packetizer = kwargs.pop('packetizer', None)
lock = task2lock[task_key]
ss = current_actor().statespace ss = current_actor().statespace
lockmap = ss.setdefault('_pubtask2lock', task2lock)
lock = lockmap[task_name]
all_subs = ss.setdefault('_subs', {}) all_subs = ss.setdefault('_subs', {})
topics2ctxs = all_subs.setdefault(task_key, {}) topics2ctxs = all_subs.setdefault(task_name, {})
try: try:
modify_subs(topics2ctxs, topics, ctx) modify_subs(topics2ctxs, topics, ctx)
# block and let existing feed task deliver # block and let existing feed task deliver
# stream data until it is cancelled in which case # stream data until it is cancelled in which case
# we'll take over and spawn it again # the next waiting task will take over and spawn it again
# update map from each symbol to requesting client's chan
async with lock: async with lock:
# no data feeder task yet; so start one # no data feeder task yet; so start one
respawn = True respawn = True
@ -132,11 +216,11 @@ def pub(*, tasks=()):
# unblocks when no more symbols subscriptions exist # unblocks when no more symbols subscriptions exist
# and the streamer task terminates # and the streamer task terminates
await fan_out_to_ctxs( await fan_out_to_ctxs(
pub_gen=partial(agen, *args, **kwargs), pub_async_gen_func=partial(agen, *args, **kwargs),
topics2ctxs=topics2ctxs, topics2ctxs=topics2ctxs,
topic_key=topic_key, packetizer=packetizer,
) )
log.info(f"Terminating stream task for {task_key}" log.info(f"Terminating stream task {task_name or ''}"
f" for {agen.__name__}") f" for {agen.__name__}")
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.exception("Respawning failed data feed task") log.exception("Respawning failed data feed task")
@ -148,6 +232,6 @@ def pub(*, tasks=()):
# if there are truly no more subscriptions with this broker # if there are truly no more subscriptions with this broker
# drop from broker subs dict # drop from broker subs dict
if not any(topics2ctxs.values()): if not any(topics2ctxs.values()):
log.info(f"No more subscriptions for publisher {task_key}") log.info(f"No more subscriptions for publisher {task_name}")
return wrapper return wrapper(wrapped)