forked from goodboy/tractor
Do proper `wrapt` arg extraction for type checking
Use an inner function / closure to properly process required arguments at call time as is recommended in the `wrap` docs. Do async gen and arg introspection at decorate time and raise appropriate type errors.contexts
parent
1b405ab4fe
commit
3d0de25f93
119
tractor/msg.py
119
tractor/msg.py
|
@ -1,8 +1,9 @@
|
||||||
"""
|
"""
|
||||||
Messaging pattern APIs and helpers.
|
Messaging pattern APIs and helpers.
|
||||||
"""
|
"""
|
||||||
|
import inspect
|
||||||
import typing
|
import typing
|
||||||
from typing import Dict, Any, Set, Union
|
from typing import Dict, Any, Set, Union, Callable
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from async_generator import aclosing
|
from async_generator import aclosing
|
||||||
|
|
||||||
|
@ -11,6 +12,7 @@ import wrapt
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from . import current_actor
|
from . import current_actor
|
||||||
|
from ._ipc import Context
|
||||||
|
|
||||||
__all__ = ['pub']
|
__all__ = ['pub']
|
||||||
|
|
||||||
|
@ -181,58 +183,79 @@ def pub(
|
||||||
|
|
||||||
@wrapt.decorator(adapter=takes_ctx)
|
@wrapt.decorator(adapter=takes_ctx)
|
||||||
async def wrapper(agen, instance, args, kwargs):
|
async def wrapper(agen, instance, args, kwargs):
|
||||||
task_name = None
|
# this is used to extract arguments properly as per
|
||||||
if tasks:
|
# the `wrapt` docs
|
||||||
try:
|
async def _execute(
|
||||||
task_name = kwargs.pop('task_name')
|
ctx: Context,
|
||||||
except KeyError:
|
topics: Set[str],
|
||||||
|
*args,
|
||||||
|
# *,
|
||||||
|
task_name: str = None,
|
||||||
|
packetizer: Callable = None,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
if tasks and task_name is None:
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
f"{agen} must be called with a `task_name` named argument "
|
f"{agen} must be called with a `task_name` named "
|
||||||
f"with a falue from {tasks}")
|
f"argument with a falue from {tasks}")
|
||||||
|
|
||||||
# pop required kwargs used internally
|
ss = current_actor().statespace
|
||||||
ctx = kwargs.pop('ctx')
|
lockmap = ss.setdefault('_pubtask2lock', task2lock)
|
||||||
topics = kwargs.pop('topics')
|
lock = lockmap[task_name]
|
||||||
packetizer = kwargs.pop('packetizer', None)
|
|
||||||
|
|
||||||
ss = current_actor().statespace
|
all_subs = ss.setdefault('_subs', {})
|
||||||
lockmap = ss.setdefault('_pubtask2lock', task2lock)
|
topics2ctxs = all_subs.setdefault(task_name, {})
|
||||||
lock = lockmap[task_name]
|
|
||||||
|
|
||||||
all_subs = ss.setdefault('_subs', {})
|
try:
|
||||||
topics2ctxs = all_subs.setdefault(task_name, {})
|
modify_subs(topics2ctxs, topics, ctx)
|
||||||
|
# block and let existing feed task deliver
|
||||||
|
# stream data until it is cancelled in which case
|
||||||
|
# the next waiting task will take over and spawn it again
|
||||||
|
async with lock:
|
||||||
|
# no data feeder task yet; so start one
|
||||||
|
respawn = True
|
||||||
|
while respawn:
|
||||||
|
respawn = False
|
||||||
|
log.info(
|
||||||
|
f"Spawning data feed task for {funcname}")
|
||||||
|
try:
|
||||||
|
# unblocks when no more symbols subscriptions exist
|
||||||
|
# and the streamer task terminates
|
||||||
|
await fan_out_to_ctxs(
|
||||||
|
pub_async_gen_func=partial(
|
||||||
|
agen, *args, **kwargs),
|
||||||
|
topics2ctxs=topics2ctxs,
|
||||||
|
packetizer=packetizer,
|
||||||
|
)
|
||||||
|
log.info(
|
||||||
|
f"Terminating stream task {task_name or ''}"
|
||||||
|
f" for {agen.__name__}")
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
log.exception("Respawning failed data feed task")
|
||||||
|
respawn = True
|
||||||
|
finally:
|
||||||
|
# remove all subs for this context
|
||||||
|
modify_subs(topics2ctxs, (), ctx)
|
||||||
|
|
||||||
try:
|
# if there are truly no more subscriptions with this broker
|
||||||
modify_subs(topics2ctxs, topics, ctx)
|
# drop from broker subs dict
|
||||||
# block and let existing feed task deliver
|
if not any(topics2ctxs.values()):
|
||||||
# stream data until it is cancelled in which case
|
log.info(
|
||||||
# the next waiting task will take over and spawn it again
|
f"No more subscriptions for publisher {task_name}")
|
||||||
async with lock:
|
|
||||||
# no data feeder task yet; so start one
|
|
||||||
respawn = True
|
|
||||||
while respawn:
|
|
||||||
respawn = False
|
|
||||||
log.info(f"Spawning data feed task for {agen.__name__}")
|
|
||||||
try:
|
|
||||||
# unblocks when no more symbols subscriptions exist
|
|
||||||
# and the streamer task terminates
|
|
||||||
await fan_out_to_ctxs(
|
|
||||||
pub_async_gen_func=partial(agen, *args, **kwargs),
|
|
||||||
topics2ctxs=topics2ctxs,
|
|
||||||
packetizer=packetizer,
|
|
||||||
)
|
|
||||||
log.info(f"Terminating stream task {task_name or ''}"
|
|
||||||
f" for {agen.__name__}")
|
|
||||||
except trio.BrokenResourceError:
|
|
||||||
log.exception("Respawning failed data feed task")
|
|
||||||
respawn = True
|
|
||||||
finally:
|
|
||||||
# remove all subs for this context
|
|
||||||
modify_subs(topics2ctxs, (), ctx)
|
|
||||||
|
|
||||||
# if there are truly no more subscriptions with this broker
|
# invoke it
|
||||||
# drop from broker subs dict
|
await _execute(*args, **kwargs)
|
||||||
if not any(topics2ctxs.values()):
|
|
||||||
log.info(f"No more subscriptions for publisher {task_name}")
|
|
||||||
|
funcname = wrapped.__name__
|
||||||
|
if not inspect.isasyncgenfunction(wrapped):
|
||||||
|
raise TypeError(
|
||||||
|
f"Publisher {funcname} must be an async generator function"
|
||||||
|
)
|
||||||
|
if 'get_topics' not in inspect.signature(wrapped).parameters:
|
||||||
|
raise TypeError(
|
||||||
|
f"Publisher async gen {funcname} must define a "
|
||||||
|
"`get_topics` argument"
|
||||||
|
)
|
||||||
|
|
||||||
return wrapper(wrapped)
|
return wrapper(wrapped)
|
||||||
|
|
Loading…
Reference in New Issue