Rename `bs_mktid` -> `bs_fqme` and drop (some) `fqsn`s

Since we have made `MktPair.bs_mktid` mean something else now, change
all the feed setup var names to instead be more representative of the
actual value: `bs_fqme: str` and use the new `MktPair.bs_fqme` where
necessary.
rekt_pps
Tyler Goodlet 2023-04-19 13:59:00 -04:00
parent 83802e932a
commit 4131ff1152
1 changed files with 29 additions and 32 deletions

View File

@ -1033,9 +1033,7 @@ async def allocate_persistent_feed(
[symstr], [symstr],
init_msgs, init_msgs,
) )
bs_mktid: str = init.bs_mktid
mkt: MktPair = init.mkt_info mkt: MktPair = init.mkt_info
assert mkt.bs_mktid == bs_mktid
fqme: str = mkt.fqme fqme: str = mkt.fqme
# HISTORY storage, run 2 tasks: # HISTORY storage, run 2 tasks:
@ -1150,14 +1148,14 @@ async def open_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
symbols: list[str], # normally expected to the broker-specific fqsn symbols: list[str], # normally expected to the broker-specific fqme
loglevel: str = 'error', loglevel: str = 'error',
tick_throttle: Optional[float] = None, tick_throttle: Optional[float] = None,
start_stream: bool = True, start_stream: bool = True,
) -> dict[ ) -> dict[
str, # fqsn str, # fqme
tuple[dict, dict] # pair of dicts of the initmsg and first quotes tuple[dict, dict] # pair of dicts of the initmsg and first quotes
]: ]:
''' '''
@ -1218,33 +1216,32 @@ async def open_feed_bus(
# XXX: ``.first_quote`` may be outdated here if this is secondary # XXX: ``.first_quote`` may be outdated here if this is secondary
# subscriber # subscriber
flume = bus.feeds[symbol] flume: Flume = bus.feeds[symbol]
sym = flume.symbol mkt: MktPair = flume.mkt
bs_mktid = sym.key bs_fqme: str = mkt.bs_fqme
fqsn = sym.fqme # true fqsn fqme: str = mkt.fqme
assert bs_mktid in fqsn and brokername in fqsn assert brokername in fqme
if sym.suffix: if mkt.suffix:
bs_mktid = fqsn.removesuffix(f'.{brokername}') log.warning(f'{brokername} expanded symbol {symbol} -> {bs_fqme}')
log.warning(f'{brokername} expanded symbol {symbol} -> {bs_mktid}')
# pack for ``.started()`` sync msg # pack for ``.started()`` sync msg
flumes[fqsn] = flume flumes[fqme] = flume
# we use the broker-specific market id (bs_mktid) for the # we use the broker-specific fqme (bs_fqme) for the
# sampler subscription since the backend isn't (yet) expected to # sampler subscription since the backend isn't (yet) expected to
# append it's own name to the fqsn, so we filter on keys which # append it's own name to the fqme, so we filter on keys which
# *do not* include that name (e.g .ib) . # *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bs_mktid, set()) bus._subscribers.setdefault(bs_fqme, set())
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
await ctx.started( await ctx.started(
{fqsn: flume.to_msg() {fqme: flume.to_msg()
for fqsn, flume in flumes.items()} for fqme, flume in flumes.items()}
) )
if not start_stream: if not start_stream:
log.warning(f'Not opening real-time stream for {fqsn}') log.warning(f'Not opening real-time stream for {fqme}')
await trio.sleep_forever() await trio.sleep_forever()
# real-time stream loop # real-time stream loop
@ -1258,11 +1255,11 @@ async def open_feed_bus(
): ):
local_subs: dict[str, set[tuple]] = {} local_subs: dict[str, set[tuple]] = {}
for fqsn, flume in flumes.items(): for fqme, flume in flumes.items():
# re-send to trigger display loop cycle (necessary especially # re-send to trigger display loop cycle (necessary especially
# when the mkt is closed and no real-time messages are # when the mkt is closed and no real-time messages are
# expected). # expected).
await stream.send({fqsn: flume.first_quote}) await stream.send({fqme: flume.first_quote})
# set a common msg stream for all requested symbols # set a common msg stream for all requested symbols
assert stream assert stream
@ -1304,9 +1301,9 @@ async def open_feed_bus(
# maybe use the current task-id to key the sub list that's # maybe use the current task-id to key the sub list that's
# added / removed? Or maybe we can add a general # added / removed? Or maybe we can add a general
# pause-resume by sub-key api? # pause-resume by sub-key api?
bs_mktid = fqsn.removesuffix(f'.{brokername}') bs_fqme = fqme.removesuffix(f'.{brokername}')
local_subs.setdefault(bs_mktid, set()).add(sub) local_subs.setdefault(bs_fqme, set()).add(sub)
bus.add_subs(bs_mktid, {sub}) bus.add_subs(bs_fqme, {sub})
# sync caller with all subs registered state # sync caller with all subs registered state
sub_registered.set() sub_registered.set()
@ -1319,16 +1316,16 @@ async def open_feed_bus(
async for msg in stream: async for msg in stream:
if msg == 'pause': if msg == 'pause':
for bs_mktid, subs in local_subs.items(): for bs_fqme, subs in local_subs.items():
log.info( log.info(
f'Pausing {bs_mktid} feed for {uid}') f'Pausing {bs_fqme} feed for {uid}')
bus.remove_subs(bs_mktid, subs) bus.remove_subs(bs_fqme, subs)
elif msg == 'resume': elif msg == 'resume':
for bs_mktid, subs in local_subs.items(): for bs_fqme, subs in local_subs.items():
log.info( log.info(
f'Resuming {bs_mktid} feed for {uid}') f'Resuming {bs_fqme} feed for {uid}')
bus.add_subs(bs_mktid, subs) bus.add_subs(bs_fqme, subs)
else: else:
raise ValueError(msg) raise ValueError(msg)
@ -1342,8 +1339,8 @@ async def open_feed_bus(
cs.cancel() cs.cancel()
# drop all subs for this task from the bus # drop all subs for this task from the bus
for bs_mktid, subs in local_subs.items(): for bs_fqme, subs in local_subs.items():
bus.remove_subs(bs_mktid, subs) bus.remove_subs(bs_fqme, subs)
class Feed(Struct): class Feed(Struct):