Use explicit `.data.feed` import of `tractor.trionics`

distribute_dis
Tyler Goodlet 2023-12-21 20:26:45 -05:00
parent 8d324acf91
commit 5a60974990
1 changed files with 13 additions and 10 deletions

View File

@ -45,10 +45,7 @@ import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor.trionics import ( from tractor import trionics
maybe_open_context,
gather_contexts,
)
from piker.accounting import ( from piker.accounting import (
MktPair, MktPair,
@ -774,7 +771,7 @@ async def maybe_open_feed(
''' '''
fqme = fqmes[0] fqme = fqmes[0]
async with maybe_open_context( async with trionics.maybe_open_context(
acm_func=open_feed, acm_func=open_feed,
kwargs={ kwargs={
'fqmes': fqmes, 'fqmes': fqmes,
@ -794,7 +791,7 @@ async def maybe_open_feed(
# add a new broadcast subscription for the quote stream # add a new broadcast subscription for the quote stream
# if this feed is likely already in use # if this feed is likely already in use
async with gather_contexts( async with trionics.gather_contexts(
mngrs=[stream.subscribe() for stream in feed.streams.values()] mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams: ) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()): for bstream, flume in zip(bstreams, feed.flumes.values()):
@ -854,7 +851,7 @@ async def open_feed(
) )
portals: tuple[tractor.Portal] portals: tuple[tractor.Portal]
async with gather_contexts( async with trionics.gather_contexts(
brokerd_ctxs, brokerd_ctxs,
) as portals: ) as portals:
@ -906,7 +903,7 @@ async def open_feed(
assert len(feed.mods) == len(feed.portals) assert len(feed.mods) == len(feed.portals)
async with ( async with (
gather_contexts(bus_ctxs) as ctxs, trionics.gather_contexts(bus_ctxs) as ctxs,
): ):
stream_ctxs: list[tractor.MsgStream] = [] stream_ctxs: list[tractor.MsgStream] = []
for ( for (
@ -948,7 +945,7 @@ async def open_feed(
brokermod: ModuleType brokermod: ModuleType
fqmes: list[str] fqmes: list[str]
async with ( async with (
gather_contexts(stream_ctxs) as streams, trionics.gather_contexts(stream_ctxs) as streams,
): ):
for ( for (
stream, stream,
@ -964,6 +961,12 @@ async def open_feed(
if brokermod.name == flume.mkt.broker: if brokermod.name == flume.mkt.broker:
flume.stream = stream flume.stream = stream
assert len(feed.mods) == len(feed.portals) == len(feed.streams) assert (
len(feed.mods)
==
len(feed.portals)
==
len(feed.streams)
)
yield feed yield feed