diff --git a/tractor/msg.py b/tractor/msg.py index 560e644..6e7470e 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -101,6 +101,7 @@ def pub( wrapped: typing.Callable = None, *, tasks: Set[str] = set(), + send_on_connect: Any = None, ): """Publisher async generator decorator. @@ -186,7 +187,7 @@ def pub( # handle the decorator not called with () case if wrapped is None: - return partial(pub, tasks=tasks) + return partial(pub, tasks=tasks, send_on_connect=send_on_connect) task2lock: Dict[str, trio.StrictFIFOLock] = {} @@ -229,6 +230,11 @@ def pub( try: modify_subs(topics2ctxs, topics, ctx) + + # if specified send the startup message back to consumer + if send_on_connect is not None: + await ctx.send_yield(send_on_connect) + # block and let existing feed task deliver # stream data until it is cancelled in which case # the next waiting task will take over and spawn it again