From a681b2f0bbc7fe03a00b0fb931527b4c901f5140 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Dec 2023 21:44:38 -0500 Subject: [PATCH] Drop passing `bus` to `tsp.manage_history()` in feed allocator --- piker/data/feed.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index db6c0c47..bd9812c8 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -121,6 +121,8 @@ class _FeedsBus(Struct): trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: with trio.CancelScope() as cs: + # TODO: shouldn't this be a direct await to avoid + # cancellation contagion to the bus nursery!?!?! await self.nursery.start( target, *args, @@ -327,7 +329,6 @@ async def allocate_persistent_feed( ) = await bus.nursery.start( manage_history, mod, - bus, mkt, some_data_ready, feed_is_live, @@ -456,8 +457,12 @@ async def open_feed_bus( if loglevel is None: loglevel = tractor.current_actor().loglevel - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) + # XXX: required to propagate ``tractor`` loglevel to piker + # logging + get_console_log( + loglevel + or tractor.current_actor().loglevel + ) # local state sanity checks # TODO: check for any stale shm entries for this symbol @@ -467,7 +472,7 @@ async def open_feed_bus( assert 'brokerd' in servicename assert brokername in servicename - bus = get_feed_bus(brokername) + bus: _FeedsBus = get_feed_bus(brokername) sub_registered = trio.Event() flumes: dict[str, Flume] = {}