Woops, need to use `.push_async_callback()`

For non-full-`.__aexit__()` handlers need this method instead (facepalm).
Also create and assign the `AnnotCtl._annot_stack: AsyncExitStack` just
before yielding the client since it's not needed prior and ensures annot
removal happens **before** ipc teardown.
distribute_dis
Tyler Goodlet 2023-12-24 15:08:44 -05:00
parent bbb98597a0
commit 1d7e97a295
1 changed files with 20 additions and 14 deletions

View File

@ -24,6 +24,7 @@ from contextlib import (
asynccontextmanager as acm,
AsyncExitStack,
)
from functools import partial
from pprint import pformat
from typing import (
# Any,
@ -164,7 +165,10 @@ class AnnotCtl(Struct):
'''
ctx2fqmes: dict[str, str]
fqme2ipc: dict[str, MsgStream]
_annot_stack = AsyncExitStack()
_annot_stack: AsyncExitStack
# runtime-populated mapping of all annotation
# ids to their equivalent IPC msg-streams.
_ipcs: dict[int, MsgStream] = {}
async def add_rect(
@ -209,8 +213,11 @@ class AnnotCtl(Struct):
aid: int = await ipc.receive()
self._ipcs[aid] = ipc
if not from_acm:
self._annot_stack.push_async_exit(
self.remove(aid)
self._annot_stack.push_async_callback(
partial(
self.remove,
aid,
)
)
return aid
@ -294,15 +301,7 @@ async def open_annot_ctl(
ctx2fqmes: dict[str, set[str]] = {}
fqme2ipc: dict[str, MsgStream] = {}
stream_ctxs: list[AsyncContextManager] = []
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2ipc=fqme2ipc,
# _annot_stack=annots_stack,
)
async with (
# AsyncExitStack() as annots_stack,
client._annot_stack, # as astack,
trionics.gather_contexts(ctx_mngrs) as ctxs,
):
for (ctx, fqmes) in ctxs:
@ -330,6 +329,13 @@ async def open_annot_ctl(
for fqme in fqmes:
fqme2ipc[fqme] = stream
# NOTE: on graceful teardown we always attempt to
# remove all annots that were created by the
# entering client.
async with AsyncExitStack() as annots_stack:
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2ipc=fqme2ipc,
_annot_stack=annots_stack,
)
yield client
# TODO: on graceful teardown should we try to
# remove all annots that were created/modded?