Add `mk_` prefix since assignments will use `fqsn`

async_hist_loading
Tyler Goodlet 2022-02-19 16:09:14 -05:00
parent 832e4c97d2
commit 1d3ed6c333
2 changed files with 40 additions and 7 deletions

View File

@ -59,6 +59,19 @@ tf_in_1m = {
} }
def mk_fqsn(
provider: str,
symbol: str,
) -> str:
'''
Generate a "fully qualified symbol name" which is
a reverse-hierarchical cross broker/provider symbol
'''
return '.'.join([symbol, provider]).lower()
def float_digits( def float_digits(
value: float, value: float,
) -> int: ) -> int:
@ -118,6 +131,12 @@ class Symbol(BaseModel):
self.key, self.key,
) )
def iterfqsns(self) -> list[str]:
return [
mk_fqsn(self.key, broker)
for broker in self.broker_info.keys()
]
@validate_arguments @validate_arguments
def mk_symbol( def mk_symbol(
@ -129,7 +148,8 @@ def mk_symbol(
broker_info: dict[str, Any] = {}, broker_info: dict[str, Any] = {},
) -> Symbol: ) -> Symbol:
'''Create and return an instrument description for the '''
Create and return an instrument description for the
"symbol" named as ``key``. "symbol" named as ``key``.
''' '''

View File

@ -47,7 +47,12 @@ from ._sharedmem import (
ShmArray, ShmArray,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from ._source import base_iohlc_dtype, mk_symbol, Symbol from ._source import (
base_iohlc_dtype,
mk_symbol,
Symbol,
mk_fqsn,
)
from ..ui import _search from ..ui import _search
from ._sampling import ( from ._sampling import (
_shms, _shms,
@ -276,11 +281,20 @@ async def open_feed_bus(
bus._subscribers.setdefault(symbol, []) bus._subscribers.setdefault(symbol, [])
fs = mk_fqsn(symbol, brokername)
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # brokerd yet, start persistent stream and shm writer task in
# service nursery # service nursery
async with bus.task_lock: async with bus.task_lock:
if entry is None: if entry is None:
if not start_stream:
raise RuntimeError(
f'No stream feed exists for {fs}?\n'
f'You may need a `brokerd` started first.'
)
init_msg, first_quotes = await bus.nursery.start( init_msg, first_quotes = await bus.nursery.start(
partial( partial(
allocate_persistent_feed, allocate_persistent_feed,
@ -307,6 +321,7 @@ async def open_feed_bus(
await ctx.started((init_msg, first_quotes)) await ctx.started((init_msg, first_quotes))
if not start_stream: if not start_stream:
log.warning(f'Not opening real-time stream for {fs}')
await trio.sleep_forever() await trio.sleep_forever()
async with ( async with (
@ -337,20 +352,19 @@ async def open_feed_bus(
try: try:
uid = ctx.chan.uid uid = ctx.chan.uid
fqsn = f'{symbol}.{brokername}'
async for msg in stream: async for msg in stream:
if msg == 'pause': if msg == 'pause':
if sub in subs: if sub in subs:
log.info( log.info(
f'Pausing {fqsn} feed for {uid}') f'Pausing {fs} feed for {uid}')
subs.remove(sub) subs.remove(sub)
elif msg == 'resume': elif msg == 'resume':
if sub not in subs: if sub not in subs:
log.info( log.info(
f'Resuming {fqsn} feed for {uid}') f'Resuming {fs} feed for {uid}')
subs.append(sub) subs.append(sub)
else: else:
raise ValueError(msg) raise ValueError(msg)
@ -614,8 +628,7 @@ async def maybe_open_feed(
'loglevel': loglevel, 'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'), 'tick_throttle': kwargs.get('tick_throttle'),
'backpressure': kwargs.get('backpressure'), 'backpressure': kwargs.get('backpressure'),
'backpressure': kwargs.get('backpressure'), 'start_stream': kwargs.get('start_stream', True),
'start_stream': kwargs.get('start_stream'),
}, },
key=sym, key=sym,
) as (cache_hit, feed): ) as (cache_hit, feed):