diff --git a/nooz/298.misc.rst b/nooz/298.misc.rst new file mode 100644 index 0000000..77a2acc --- /dev/null +++ b/nooz/298.misc.rst @@ -0,0 +1,2 @@ +Add a new `tractor.experimental` subpackage for staging new high level +APIs and subystems that we might eventually make built-ins. diff --git a/setup.py b/setup.py index b17bb57..77b43f1 100755 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ setup( platforms=['linux', 'windows'], packages=[ 'tractor', + 'tractor.experimental', 'tractor.trionics', 'tractor.testing', ], diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 6243a4e..390ca29 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -5,19 +5,20 @@ import pytest import trio import tractor from tractor.testing import tractor_test +from tractor.experimental import msgpub def test_type_checks(): with pytest.raises(TypeError) as err: - @tractor.msg.pub + @msgpub async def no_get_topics(yo): yield assert "must define a `get_topics`" in str(err.value) with pytest.raises(TypeError) as err: - @tractor.msg.pub + @msgpub def not_async_gen(yo): pass @@ -32,7 +33,7 @@ def is_even(i): _get_topics = None -@tractor.msg.pub +@msgpub async def pubber(get_topics, seed=10): # ensure topic subscriptions are as expected @@ -103,7 +104,7 @@ async def subs( await stream.aclose() -@tractor.msg.pub(tasks=['one', 'two']) +@msgpub(tasks=['one', 'two']) async def multilock_pubber(get_topics): yield {'doggy': 10} diff --git a/tractor/experimental/__init__.py b/tractor/experimental/__init__.py new file mode 100644 index 0000000..4fad3be --- /dev/null +++ b/tractor/experimental/__init__.py @@ -0,0 +1,29 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Experimental APIs and subsystems not yet validated to be included as +built-ins. + +This is a staging area for ``tractor.builtin``. + +''' +from ._pubsub import pub as msgpub + + +__all__ = [ + 'msgpub', +] diff --git a/tractor/experimental/_pubsub.py b/tractor/experimental/_pubsub.py new file mode 100644 index 0000000..7a8ec37 --- /dev/null +++ b/tractor/experimental/_pubsub.py @@ -0,0 +1,329 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Single target entrypoint, remote-task, dynamic (no push if no consumer) +pubsub API using async an generator which muli-plexes to consumers by +key. + +NOTE: this module is likely deprecated by the new bi-directional streaming +support provided by ``tractor.Context.open_stream()`` and friends. + +""" +from __future__ import annotations +import inspect +import typing +from typing import Dict, Any, Set, Callable, List, Tuple +from functools import partial +from async_generator import aclosing + +import trio +import wrapt + +from ..log import get_logger +from .._streaming import Context + + +__all__ = ['pub'] + +log = get_logger('messaging') + + +async def fan_out_to_ctxs( + pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy + topics2ctxs: Dict[str, list], + packetizer: typing.Callable = None, +) -> None: + ''' + Request and fan out quotes to each subscribed actor channel. + + ''' + + def get_topics(): + return tuple(topics2ctxs.keys()) + + agen = pub_async_gen_func(get_topics=get_topics) + + async with aclosing(agen) as pub_gen: + + async for published in pub_gen: + + ctx_payloads: List[Tuple[Context, Any]] = [] + + for topic, data in published.items(): + log.debug(f"publishing {topic, data}") + + # build a new dict packet or invoke provided packetizer + if packetizer is None: + packet = {topic: data} + + else: + packet = packetizer(topic, data) + + for ctx in topics2ctxs.get(topic, list()): + ctx_payloads.append((ctx, packet)) + + if not ctx_payloads: + log.debug(f"Unconsumed values:\n{published}") + + # deliver to each subscriber (fan out) + if ctx_payloads: + for ctx, payload in ctx_payloads: + try: + await ctx.send_yield(payload) + except ( + # That's right, anything you can think of... + trio.ClosedResourceError, ConnectionResetError, + ConnectionRefusedError, + ): + log.warning(f"{ctx.chan} went down?") + for ctx_list in topics2ctxs.values(): + try: + ctx_list.remove(ctx) + except ValueError: + continue + + if not get_topics(): + log.warning(f"No subscribers left for {pub_gen}") + break + + +def modify_subs( + + topics2ctxs: Dict[str, List[Context]], + topics: Set[str], + ctx: Context, + +) -> None: + """Absolute symbol subscription list for each quote stream. + + Effectively a symbol subscription api. + """ + log.info(f"{ctx.chan.uid} changed subscription to {topics}") + + # update map from each symbol to requesting client's chan + for topic in topics: + topics2ctxs.setdefault(topic, list()).append(ctx) + + # remove any existing symbol subscriptions if symbol is not + # found in ``symbols`` + # TODO: this can likely be factored out into the pub-sub api + for topic in filter( + lambda topic: topic not in topics, topics2ctxs.copy() + ): + ctx_list = topics2ctxs.get(topic) + if ctx_list: + try: + ctx_list.remove(ctx) + except ValueError: + pass + + if not ctx_list: + # pop empty sets which will trigger bg quoter task termination + topics2ctxs.pop(topic) + + +_pub_state: Dict[str, dict] = {} +_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {} + + +def pub( + wrapped: typing.Callable = None, + *, + tasks: Set[str] = set(), +): + """Publisher async generator decorator. + + A publisher can be called multiple times from different actors but + will only spawn a finite set of internal tasks to stream values to + each caller. The ``tasks: Set[str]`` argument to the decorator + specifies the names of the mutex set of publisher tasks. When the + publisher function is called, an argument ``task_name`` must be + passed to specify which task (of the set named in ``tasks``) should + be used. This allows for using the same publisher with different + input (arguments) without allowing more concurrent tasks then + necessary. + + Values yielded from the decorated async generator must be + ``Dict[str, Dict[str, Any]]`` where the fist level key is the topic + string and determines which subscription the packet will be + delivered to and the value is a packet ``Dict[str, Any]`` by default + of the form: + + .. ::python + + {topic: str: value: Any} + + The caller can instead opt to pass a ``packetizer`` callback who's + return value will be delivered as the published response. + + The decorated async generator function must accept an argument + :func:`get_topics` which dynamically returns the tuple of current + subscriber topics: + + .. code:: python + + from tractor.msg import pub + + @pub(tasks={'source_1', 'source_2'}) + async def pub_service(get_topics): + data = await web_request(endpoints=get_topics()) + for item in data: + yield data['key'], data + + + The publisher must be called passing in the following arguments: + - ``topics: Set[str]`` the topic sequence or "subscriptions" + - ``task_name: str`` the task to use (if ``tasks`` was passed) + - ``ctx: Context`` the tractor context (only needed if calling the + pub func without a nursery, otherwise this is provided implicitly) + - packetizer: ``Callable[[str, Any], Any]`` a callback who receives + the topic and value from the publisher function each ``yield`` such that + whatever is returned is sent as the published value to subscribers of + that topic. By default this is a dict ``{topic: str: value: Any}``. + + As an example, to make a subscriber call the above function: + + .. code:: python + + from functools import partial + import tractor + + async with tractor.open_nursery() as n: + portal = n.run_in_actor( + 'publisher', # actor name + partial( # func to execute in it + pub_service, + topics=('clicks', 'users'), + task_name='source1', + ) + ) + async for value in await portal.result(): + print(f"Subscriber received {value}") + + + Here, you don't need to provide the ``ctx`` argument since the + remote actor provides it automatically to the spawned task. If you + were to call ``pub_service()`` directly from a wrapping function you + would need to provide this explicitly. + + Remember you only need this if you need *a finite set of tasks* + running in a single actor to stream data to an arbitrary number of + subscribers. If you are ok to have a new task running for every call + to ``pub_service()`` then probably don't need this. + """ + global _pubtask2lock + + # handle the decorator not called with () case + if wrapped is None: + return partial(pub, tasks=tasks) + + task2lock: Dict[str, trio.StrictFIFOLock] = {} + + for name in tasks: + task2lock[name] = trio.StrictFIFOLock() + + @wrapt.decorator + async def wrapper(agen, instance, args, kwargs): + + # XXX: this is used to extract arguments properly as per the + # `wrapt` docs + async def _execute( + ctx: Context, + topics: Set[str], + *args, + # *, + task_name: str = None, # default: only one task allocated + packetizer: Callable = None, + **kwargs, + ): + if task_name is None: + task_name = trio.lowlevel.current_task().name + + if tasks and task_name not in tasks: + raise TypeError( + f"{agen} must be called with a `task_name` named " + f"argument with a value from {tasks}") + + elif not tasks and not task2lock: + # add a default root-task lock if none defined + task2lock[task_name] = trio.StrictFIFOLock() + + _pubtask2lock.update(task2lock) + + topics = set(topics) + lock = _pubtask2lock[task_name] + + all_subs = _pub_state.setdefault('_subs', {}) + topics2ctxs = all_subs.setdefault(task_name, {}) + + try: + modify_subs(topics2ctxs, topics, ctx) + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # the next waiting task will take over and spawn it again + async with lock: + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False + log.info( + f"Spawning data feed task for {funcname}") + try: + # unblocks when no more symbols subscriptions exist + # and the streamer task terminates + await fan_out_to_ctxs( + pub_async_gen_func=partial( + agen, *args, **kwargs), + topics2ctxs=topics2ctxs, + packetizer=packetizer, + ) + log.info( + f"Terminating stream task {task_name or ''}" + f" for {agen.__name__}") + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True + finally: + # remove all subs for this context + modify_subs(topics2ctxs, set(), ctx) + + # if there are truly no more subscriptions with this broker + # drop from broker subs dict + if not any(topics2ctxs.values()): + log.info( + f"No more subscriptions for publisher {task_name}") + + # invoke it + await _execute(*args, **kwargs) + + funcname = wrapped.__name__ + if not inspect.isasyncgenfunction(wrapped): + raise TypeError( + f"Publisher {funcname} must be an async generator function" + ) + if 'get_topics' not in inspect.signature(wrapped).parameters: + raise TypeError( + f"Publisher async gen {funcname} must define a " + "`get_topics` argument" + ) + + # XXX: manually monkey the wrapped function since + # ``wrapt.decorator`` doesn't seem to want to play nice with its + # whole "adapter" thing... + wrapped._tractor_stream_function = True # type: ignore + + return wrapper(wrapped) diff --git a/tractor/msg.py b/tractor/msg.py index caa03ec..16f4e5c 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -14,309 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Messaging pattern APIs and helpers. +''' +Coming soon! -NOTE: this module is likely deprecated by the new bi-directional streaming -support provided by ``tractor.Context.open_stream()`` and friends. - -""" -import inspect -import typing -from typing import Dict, Any, Set, Callable, List, Tuple -from functools import partial -from async_generator import aclosing - -import trio -import wrapt - -from .log import get_logger -from ._streaming import Context - -__all__ = ['pub'] - -log = get_logger('messaging') - - -async def fan_out_to_ctxs( - pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy - topics2ctxs: Dict[str, list], - packetizer: typing.Callable = None, -) -> None: - """Request and fan out quotes to each subscribed actor channel. - """ - def get_topics(): - return tuple(topics2ctxs.keys()) - - agen = pub_async_gen_func(get_topics=get_topics) - - async with aclosing(agen) as pub_gen: - - async for published in pub_gen: - - ctx_payloads: List[Tuple[Context, Any]] = [] - - for topic, data in published.items(): - log.debug(f"publishing {topic, data}") - - # build a new dict packet or invoke provided packetizer - if packetizer is None: - packet = {topic: data} - - else: - packet = packetizer(topic, data) - - for ctx in topics2ctxs.get(topic, list()): - ctx_payloads.append((ctx, packet)) - - if not ctx_payloads: - log.debug(f"Unconsumed values:\n{published}") - - # deliver to each subscriber (fan out) - if ctx_payloads: - for ctx, payload in ctx_payloads: - try: - await ctx.send_yield(payload) - except ( - # That's right, anything you can think of... - trio.ClosedResourceError, ConnectionResetError, - ConnectionRefusedError, - ): - log.warning(f"{ctx.chan} went down?") - for ctx_list in topics2ctxs.values(): - try: - ctx_list.remove(ctx) - except ValueError: - continue - - if not get_topics(): - log.warning(f"No subscribers left for {pub_gen}") - break - - -def modify_subs( - - topics2ctxs: Dict[str, List[Context]], - topics: Set[str], - ctx: Context, - -) -> None: - """Absolute symbol subscription list for each quote stream. - - Effectively a symbol subscription api. - """ - log.info(f"{ctx.chan.uid} changed subscription to {topics}") - - # update map from each symbol to requesting client's chan - for topic in topics: - topics2ctxs.setdefault(topic, list()).append(ctx) - - # remove any existing symbol subscriptions if symbol is not - # found in ``symbols`` - # TODO: this can likely be factored out into the pub-sub api - for topic in filter( - lambda topic: topic not in topics, topics2ctxs.copy() - ): - ctx_list = topics2ctxs.get(topic) - if ctx_list: - try: - ctx_list.remove(ctx) - except ValueError: - pass - - if not ctx_list: - # pop empty sets which will trigger bg quoter task termination - topics2ctxs.pop(topic) - - -_pub_state: Dict[str, dict] = {} -_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {} - - -def pub( - wrapped: typing.Callable = None, - *, - tasks: Set[str] = set(), -): - """Publisher async generator decorator. - - A publisher can be called multiple times from different actors but - will only spawn a finite set of internal tasks to stream values to - each caller. The ``tasks: Set[str]`` argument to the decorator - specifies the names of the mutex set of publisher tasks. When the - publisher function is called, an argument ``task_name`` must be - passed to specify which task (of the set named in ``tasks``) should - be used. This allows for using the same publisher with different - input (arguments) without allowing more concurrent tasks then - necessary. - - Values yielded from the decorated async generator must be - ``Dict[str, Dict[str, Any]]`` where the fist level key is the topic - string and determines which subscription the packet will be - delivered to and the value is a packet ``Dict[str, Any]`` by default - of the form: - - .. ::python - - {topic: str: value: Any} - - The caller can instead opt to pass a ``packetizer`` callback who's - return value will be delivered as the published response. - - The decorated async generator function must accept an argument - :func:`get_topics` which dynamically returns the tuple of current - subscriber topics: - - .. code:: python - - from tractor.msg import pub - - @pub(tasks={'source_1', 'source_2'}) - async def pub_service(get_topics): - data = await web_request(endpoints=get_topics()) - for item in data: - yield data['key'], data - - - The publisher must be called passing in the following arguments: - - ``topics: Set[str]`` the topic sequence or "subscriptions" - - ``task_name: str`` the task to use (if ``tasks`` was passed) - - ``ctx: Context`` the tractor context (only needed if calling the - pub func without a nursery, otherwise this is provided implicitly) - - packetizer: ``Callable[[str, Any], Any]`` a callback who receives - the topic and value from the publisher function each ``yield`` such that - whatever is returned is sent as the published value to subscribers of - that topic. By default this is a dict ``{topic: str: value: Any}``. - - As an example, to make a subscriber call the above function: - - .. code:: python - - from functools import partial - import tractor - - async with tractor.open_nursery() as n: - portal = n.run_in_actor( - 'publisher', # actor name - partial( # func to execute in it - pub_service, - topics=('clicks', 'users'), - task_name='source1', - ) - ) - async for value in await portal.result(): - print(f"Subscriber received {value}") - - - Here, you don't need to provide the ``ctx`` argument since the - remote actor provides it automatically to the spawned task. If you - were to call ``pub_service()`` directly from a wrapping function you - would need to provide this explicitly. - - Remember you only need this if you need *a finite set of tasks* - running in a single actor to stream data to an arbitrary number of - subscribers. If you are ok to have a new task running for every call - to ``pub_service()`` then probably don't need this. - """ - global _pubtask2lock - - # handle the decorator not called with () case - if wrapped is None: - return partial(pub, tasks=tasks) - - task2lock: Dict[str, trio.StrictFIFOLock] = {} - - for name in tasks: - task2lock[name] = trio.StrictFIFOLock() - - @wrapt.decorator - async def wrapper(agen, instance, args, kwargs): - - # XXX: this is used to extract arguments properly as per the - # `wrapt` docs - async def _execute( - ctx: Context, - topics: Set[str], - *args, - # *, - task_name: str = None, # default: only one task allocated - packetizer: Callable = None, - **kwargs, - ): - if task_name is None: - task_name = trio.lowlevel.current_task().name - - if tasks and task_name not in tasks: - raise TypeError( - f"{agen} must be called with a `task_name` named " - f"argument with a value from {tasks}") - - elif not tasks and not task2lock: - # add a default root-task lock if none defined - task2lock[task_name] = trio.StrictFIFOLock() - - _pubtask2lock.update(task2lock) - - topics = set(topics) - lock = _pubtask2lock[task_name] - - all_subs = _pub_state.setdefault('_subs', {}) - topics2ctxs = all_subs.setdefault(task_name, {}) - - try: - modify_subs(topics2ctxs, topics, ctx) - # block and let existing feed task deliver - # stream data until it is cancelled in which case - # the next waiting task will take over and spawn it again - async with lock: - # no data feeder task yet; so start one - respawn = True - while respawn: - respawn = False - log.info( - f"Spawning data feed task for {funcname}") - try: - # unblocks when no more symbols subscriptions exist - # and the streamer task terminates - await fan_out_to_ctxs( - pub_async_gen_func=partial( - agen, *args, **kwargs), - topics2ctxs=topics2ctxs, - packetizer=packetizer, - ) - log.info( - f"Terminating stream task {task_name or ''}" - f" for {agen.__name__}") - except trio.BrokenResourceError: - log.exception("Respawning failed data feed task") - respawn = True - finally: - # remove all subs for this context - modify_subs(topics2ctxs, set(), ctx) - - # if there are truly no more subscriptions with this broker - # drop from broker subs dict - if not any(topics2ctxs.values()): - log.info( - f"No more subscriptions for publisher {task_name}") - - # invoke it - await _execute(*args, **kwargs) - - funcname = wrapped.__name__ - if not inspect.isasyncgenfunction(wrapped): - raise TypeError( - f"Publisher {funcname} must be an async generator function" - ) - if 'get_topics' not in inspect.signature(wrapped).parameters: - raise TypeError( - f"Publisher async gen {funcname} must define a " - "`get_topics` argument" - ) - - # XXX: manually monkey the wrapped function since - # ``wrapt.decorator`` doesn't seem to want to play nice with its - # whole "adapter" thing... - wrapped._tractor_stream_function = True # type: ignore - - return wrapper(wrapped) +'''