forked from goodboy/tractor
1
0
Fork 0

Compare commits

...

1 Commits

Author SHA1 Message Date
Tyler Goodlet 9a5dcbbd31 Add a @pub kwarg to allow specifying a "startup response message" 2021-11-01 12:11:22 -04:00
1 changed files with 7 additions and 1 deletions

View File

@ -121,6 +121,7 @@ def pub(
wrapped: typing.Callable = None, wrapped: typing.Callable = None,
*, *,
tasks: Set[str] = set(), tasks: Set[str] = set(),
send_on_connect: Any = None,
): ):
"""Publisher async generator decorator. """Publisher async generator decorator.
@ -206,7 +207,7 @@ def pub(
# handle the decorator not called with () case # handle the decorator not called with () case
if wrapped is None: if wrapped is None:
return partial(pub, tasks=tasks) return partial(pub, tasks=tasks, send_on_connect=send_on_connect)
task2lock: Dict[str, trio.StrictFIFOLock] = {} task2lock: Dict[str, trio.StrictFIFOLock] = {}
@ -249,6 +250,11 @@ def pub(
try: try:
modify_subs(topics2ctxs, topics, ctx) modify_subs(topics2ctxs, topics, ctx)
# if specified send the startup message back to consumer
if send_on_connect is not None:
await ctx.send_yield(send_on_connect)
# block and let existing feed task deliver # block and let existing feed task deliver
# stream data until it is cancelled in which case # stream data until it is cancelled in which case
# the next waiting task will take over and spawn it again # the next waiting task will take over and spawn it again