Drop passing `bus` to `tsp.manage_history()` in feed allocator

distribute_dis
Tyler Goodlet 2023-12-22 21:44:38 -05:00
parent 5b0c94933b
commit a681b2f0bb
1 changed files with 9 additions and 4 deletions

View File

@ -121,6 +121,8 @@ class _FeedsBus(Struct):
trio.CancelScope] = trio.TASK_STATUS_IGNORED, trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# TODO: shouldn't this be a direct await to avoid
# cancellation contagion to the bus nursery!?!?!
await self.nursery.start( await self.nursery.start(
target, target,
*args, *args,
@ -327,7 +329,6 @@ async def allocate_persistent_feed(
) = await bus.nursery.start( ) = await bus.nursery.start(
manage_history, manage_history,
mod, mod,
bus,
mkt, mkt,
some_data_ready, some_data_ready,
feed_is_live, feed_is_live,
@ -456,8 +457,12 @@ async def open_feed_bus(
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker
get_console_log(loglevel or tractor.current_actor().loglevel) # logging
get_console_log(
loglevel
or tractor.current_actor().loglevel
)
# local state sanity checks # local state sanity checks
# TODO: check for any stale shm entries for this symbol # TODO: check for any stale shm entries for this symbol
@ -467,7 +472,7 @@ async def open_feed_bus(
assert 'brokerd' in servicename assert 'brokerd' in servicename
assert brokername in servicename assert brokername in servicename
bus = get_feed_bus(brokername) bus: _FeedsBus = get_feed_bus(brokername)
sub_registered = trio.Event() sub_registered = trio.Event()
flumes: dict[str, Flume] = {} flumes: dict[str, Flume] = {}