forked from goodboy/tractor
Compare commits
2 Commits
master
...
pub_connec
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 954554221f | |
Tyler Goodlet | 29fd956077 |
|
@ -248,6 +248,7 @@ def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
# may have the tty locked prior
|
# may have the tty locked prior
|
||||||
if _debug_lock.locked(): # root process already has it; ignore
|
if _debug_lock.locked(): # root process already has it; ignore
|
||||||
return
|
return
|
||||||
|
|
||||||
await _debug_lock.acquire()
|
await _debug_lock.acquire()
|
||||||
_pdb_release_hook = _debug_lock.release
|
_pdb_release_hook = _debug_lock.release
|
||||||
|
|
||||||
|
|
|
@ -97,6 +97,7 @@ def pub(
|
||||||
wrapped: typing.Callable = None,
|
wrapped: typing.Callable = None,
|
||||||
*,
|
*,
|
||||||
tasks: Set[str] = set(),
|
tasks: Set[str] = set(),
|
||||||
|
send_on_connect: Any = None,
|
||||||
):
|
):
|
||||||
"""Publisher async generator decorator.
|
"""Publisher async generator decorator.
|
||||||
|
|
||||||
|
@ -182,7 +183,7 @@ def pub(
|
||||||
|
|
||||||
# handle the decorator not called with () case
|
# handle the decorator not called with () case
|
||||||
if wrapped is None:
|
if wrapped is None:
|
||||||
return partial(pub, tasks=tasks)
|
return partial(pub, tasks=tasks, send_on_connect=send_on_connect)
|
||||||
|
|
||||||
task2lock: Dict[str, trio.StrictFIFOLock] = {}
|
task2lock: Dict[str, trio.StrictFIFOLock] = {}
|
||||||
|
|
||||||
|
@ -225,6 +226,11 @@ def pub(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
modify_subs(topics2ctxs, topics, ctx)
|
modify_subs(topics2ctxs, topics, ctx)
|
||||||
|
|
||||||
|
# if specified send the startup message back to consumer
|
||||||
|
if send_on_connect is not None:
|
||||||
|
await ctx.send_yield(send_on_connect)
|
||||||
|
|
||||||
# block and let existing feed task deliver
|
# block and let existing feed task deliver
|
||||||
# stream data until it is cancelled in which case
|
# stream data until it is cancelled in which case
|
||||||
# the next waiting task will take over and spawn it again
|
# the next waiting task will take over and spawn it again
|
||||||
|
|
Loading…
Reference in New Issue