diff --git a/tractor/msg.py b/tractor/msg.py index 5b343b6..f9c537d 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -97,6 +97,7 @@ def pub( wrapped: typing.Callable = None, *, tasks: Set[str] = set(), + send_on_connect: Any = None, ): """Publisher async generator decorator. @@ -182,7 +183,7 @@ def pub( # handle the decorator not called with () case if wrapped is None: - return partial(pub, tasks=tasks) + return partial(pub, tasks=tasks, send_on_connect=send_on_connect) task2lock: Dict[str, trio.StrictFIFOLock] = {} @@ -225,6 +226,11 @@ def pub( try: modify_subs(topics2ctxs, topics, ctx) + + # if specified send the startup message back to consumer + if send_on_connect is not None: + await ctx.send_yield(send_on_connect) + # block and let existing feed task deliver # stream data until it is cancelled in which case # the next waiting task will take over and spawn it again