diff --git a/piker/data/_source.py b/piker/data/_source.py index 9b9b323d..0677df65 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -59,6 +59,19 @@ tf_in_1m = { } +def mk_fqsn( + provider: str, + symbol: str, + +) -> str: + ''' + Generate a "fully qualified symbol name" which is + a reverse-hierarchical cross broker/provider symbol + + ''' + return '.'.join([symbol, provider]).lower() + + def float_digits( value: float, ) -> int: @@ -118,6 +131,12 @@ class Symbol(BaseModel): self.key, ) + def iterfqsns(self) -> list[str]: + return [ + mk_fqsn(self.key, broker) + for broker in self.broker_info.keys() + ] + @validate_arguments def mk_symbol( @@ -129,7 +148,8 @@ def mk_symbol( broker_info: dict[str, Any] = {}, ) -> Symbol: - '''Create and return an instrument description for the + ''' + Create and return an instrument description for the "symbol" named as ``key``. ''' diff --git a/piker/data/feed.py b/piker/data/feed.py index 75a37545..b28c958e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -47,7 +47,12 @@ from ._sharedmem import ( ShmArray, ) from .ingest import get_ingestormod -from ._source import base_iohlc_dtype, mk_symbol, Symbol +from ._source import ( + base_iohlc_dtype, + mk_symbol, + Symbol, + mk_fqsn, +) from ..ui import _search from ._sampling import ( _shms, @@ -276,11 +281,20 @@ async def open_feed_bus( bus._subscribers.setdefault(symbol, []) + fs = mk_fqsn(symbol, brokername) + # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery async with bus.task_lock: if entry is None: + + if not start_stream: + raise RuntimeError( + f'No stream feed exists for {fs}?\n' + f'You may need a `brokerd` started first.' + ) + init_msg, first_quotes = await bus.nursery.start( partial( allocate_persistent_feed, @@ -307,6 +321,7 @@ async def open_feed_bus( await ctx.started((init_msg, first_quotes)) if not start_stream: + log.warning(f'Not opening real-time stream for {fs}') await trio.sleep_forever() async with ( @@ -337,20 +352,19 @@ async def open_feed_bus( try: uid = ctx.chan.uid - fqsn = f'{symbol}.{brokername}' async for msg in stream: if msg == 'pause': if sub in subs: log.info( - f'Pausing {fqsn} feed for {uid}') + f'Pausing {fs} feed for {uid}') subs.remove(sub) elif msg == 'resume': if sub not in subs: log.info( - f'Resuming {fqsn} feed for {uid}') + f'Resuming {fs} feed for {uid}') subs.append(sub) else: raise ValueError(msg) @@ -614,8 +628,7 @@ async def maybe_open_feed( 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), 'backpressure': kwargs.get('backpressure'), - 'backpressure': kwargs.get('backpressure'), - 'start_stream': kwargs.get('start_stream'), + 'start_stream': kwargs.get('start_stream', True), }, key=sym, ) as (cache_hit, feed):