Compare commits

...

2 Commits

Author SHA1 Message Date
Tyler Goodlet 954554221f Add a @pub kwarg to allow specifying a "startup response message" 2021-04-27 11:54:07 -04:00
Tyler Goodlet 29fd956077 Make SIGINT handler kill the process tree
The std lib's `pdb` internals override SIGINT handling whenever one
enters the debugger repl. Force a handler that kills the tree if SIGINT
is triggered from the root actor, otherwise igore it since supervised
children should be managed already. This resolves an issue with guest
mode where `pdb` causes SIGINTs to be swallowed resulting in the host
loop never terminating the process tree.
2021-04-27 11:35:35 -04:00
2 changed files with 8 additions and 1 deletions

View File

@ -248,6 +248,7 @@ def _breakpoint(debug_func) -> Awaitable[None]:
# may have the tty locked prior # may have the tty locked prior
if _debug_lock.locked(): # root process already has it; ignore if _debug_lock.locked(): # root process already has it; ignore
return return
await _debug_lock.acquire() await _debug_lock.acquire()
_pdb_release_hook = _debug_lock.release _pdb_release_hook = _debug_lock.release

View File

@ -97,6 +97,7 @@ def pub(
wrapped: typing.Callable = None, wrapped: typing.Callable = None,
*, *,
tasks: Set[str] = set(), tasks: Set[str] = set(),
send_on_connect: Any = None,
): ):
"""Publisher async generator decorator. """Publisher async generator decorator.
@ -182,7 +183,7 @@ def pub(
# handle the decorator not called with () case # handle the decorator not called with () case
if wrapped is None: if wrapped is None:
return partial(pub, tasks=tasks) return partial(pub, tasks=tasks, send_on_connect=send_on_connect)
task2lock: Dict[str, trio.StrictFIFOLock] = {} task2lock: Dict[str, trio.StrictFIFOLock] = {}
@ -225,6 +226,11 @@ def pub(
try: try:
modify_subs(topics2ctxs, topics, ctx) modify_subs(topics2ctxs, topics, ctx)
# if specified send the startup message back to consumer
if send_on_connect is not None:
await ctx.send_yield(send_on_connect)
# block and let existing feed task deliver # block and let existing feed task deliver
# stream data until it is cancelled in which case # stream data until it is cancelled in which case
# the next waiting task will take over and spawn it again # the next waiting task will take over and spawn it again