diff --git a/piker/data/feed.py b/piker/data/feed.py index 6fc3bb1b..db6c0c47 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -45,10 +45,7 @@ import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor -from tractor.trionics import ( - maybe_open_context, - gather_contexts, -) +from tractor import trionics from piker.accounting import ( MktPair, @@ -774,7 +771,7 @@ async def maybe_open_feed( ''' fqme = fqmes[0] - async with maybe_open_context( + async with trionics.maybe_open_context( acm_func=open_feed, kwargs={ 'fqmes': fqmes, @@ -794,7 +791,7 @@ async def maybe_open_feed( # add a new broadcast subscription for the quote stream # if this feed is likely already in use - async with gather_contexts( + async with trionics.gather_contexts( mngrs=[stream.subscribe() for stream in feed.streams.values()] ) as bstreams: for bstream, flume in zip(bstreams, feed.flumes.values()): @@ -854,7 +851,7 @@ async def open_feed( ) portals: tuple[tractor.Portal] - async with gather_contexts( + async with trionics.gather_contexts( brokerd_ctxs, ) as portals: @@ -906,7 +903,7 @@ async def open_feed( assert len(feed.mods) == len(feed.portals) async with ( - gather_contexts(bus_ctxs) as ctxs, + trionics.gather_contexts(bus_ctxs) as ctxs, ): stream_ctxs: list[tractor.MsgStream] = [] for ( @@ -948,7 +945,7 @@ async def open_feed( brokermod: ModuleType fqmes: list[str] async with ( - gather_contexts(stream_ctxs) as streams, + trionics.gather_contexts(stream_ctxs) as streams, ): for ( stream, @@ -964,6 +961,12 @@ async def open_feed( if brokermod.name == flume.mkt.broker: flume.stream = stream - assert len(feed.mods) == len(feed.portals) == len(feed.streams) + assert ( + len(feed.mods) + == + len(feed.portals) + == + len(feed.streams) + ) yield feed