diff --git a/piker/data/feed.py b/piker/data/feed.py index c9bb5e36..93989a58 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -50,9 +50,8 @@ from ._sharedmem import ( from .ingest import get_ingestormod from ._source import ( base_iohlc_dtype, - mk_symbol, Symbol, - mk_fqsn, + uncons_fqsn, ) from ..ui import _search from ._sampling import ( @@ -192,7 +191,7 @@ async def _setup_persistent_brokerd( async def manage_history( mod: ModuleType, bus: _FeedsBus, - symbol: str, + fqsn: str, some_data_ready: trio.Event, feed_is_live: trio.Event, @@ -206,8 +205,6 @@ async def manage_history( buffer. ''' - fqsn = mk_fqsn(mod.name, symbol) - # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. shm, opened = maybe_open_shm_array( @@ -226,7 +223,7 @@ async def manage_history( # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars - _ = await bus.nursery.start(mod.backfill_bars, symbol, shm) + _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) # yield back after client connect with filled shm task_status.started(shm) @@ -285,8 +282,6 @@ async def allocate_persistent_feed( except ImportError: mod = get_ingestormod(brokername) - fqsn = mk_fqsn(brokername, symbol) - # mem chan handed to broker backend so it can push real-time # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) @@ -295,28 +290,9 @@ async def allocate_persistent_feed( some_data_ready = trio.Event() feed_is_live = trio.Event() - # run 2 tasks: - # - a history loader / maintainer - # - a real-time streamer which consumers and sends new data to any - # consumers as well as writes to storage backends (as configured). - - # XXX: neither of these will raise but will cause an inf hang due to: - # https://github.com/python-trio/trio/issues/2258 - # bus.nursery.start_soon( - # await bus.start_task( - - shm = await bus.nursery.start( - manage_history, - mod, - bus, - symbol, - some_data_ready, - feed_is_live, - ) - # establish broker backend quote stream by calling # ``stream_quotes()``, which is a required broker backend endpoint. - init_msg, first_quotes = await bus.nursery.start( + init_msg, first_quote = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, @@ -325,11 +301,38 @@ async def allocate_persistent_feed( loglevel=loglevel, ) ) + # the broker-specific fully qualified symbol name + bfqsn = init_msg[symbol]['fqsn'] + + # HISTORY, run 2 tasks: + # - a history loader / maintainer + # - a real-time streamer which consumers and sends new data to any + # consumers as well as writes to storage backends (as configured). + + # XXX: neither of these will raise but will cause an inf hang due to: + # https://github.com/python-trio/trio/issues/2258 + # bus.nursery.start_soon( + # await bus.start_task( + shm = await bus.nursery.start( + manage_history, + mod, + bus, + bfqsn, + some_data_ready, + feed_is_live, + ) # we hand an IPC-msg compatible shm token to the caller so it # can read directly from the memory which will be written by # this task. - init_msg[symbol]['shm_token'] = shm.token + msg = init_msg[symbol] + msg['shm_token'] = shm.token + + # true fqsn + fqsn = '.'.join((bfqsn, brokername)) + + # add a fqsn entry that includes the ``.`` suffix + init_msg[fqsn] = msg # TODO: pretty sure we don't need this? why not just leave 1s as # the fastest "sample period" since we'll probably always want that @@ -342,8 +345,22 @@ async def allocate_persistent_feed( log.info(f'waiting on history to load: {fqsn}') await some_data_ready.wait() - bus.feeds[symbol.lower()] = (init_msg, first_quotes) - task_status.started((init_msg, first_quotes)) + # append ``.`` suffix to each quote symbol + bsym = symbol + f'.{brokername}' + generic_first_quotes = { + bsym: first_quote, + fqsn: first_quote, + } + + bus.feeds[symbol] = bus.feeds[fqsn] = ( + init_msg, + generic_first_quotes, + ) + # for ambiguous names we simply apply the retreived + # feed to that name (for now). + + # task_status.started((init_msg, generic_first_quotes)) + task_status.started() # backend will indicate when real-time quotes have begun. await feed_is_live.wait() @@ -358,10 +375,11 @@ async def allocate_persistent_feed( bus, shm, quote_stream, + brokername, sum_tick_vlm ) finally: - log.warning(f'{symbol}@{brokername} feed task terminated') + log.warning(f'{fqsn} feed task terminated') @tractor.context @@ -394,25 +412,16 @@ async def open_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) - bus._subscribers.setdefault(symbol, []) - fqsn = mk_fqsn(brokername, symbol) - - entry = bus.feeds.get(symbol) # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery + entry = bus.feeds.get(symbol) if entry is None: - if not start_stream: - raise RuntimeError( - f'No stream feed exists for {fqsn}?\n' - f'You may need a `brokerd` started first.' - ) - - # allocate a new actor-local stream bus which will persist for - # this `brokerd`. + # allocate a new actor-local stream bus which + # will persist for this `brokerd`. async with bus.task_lock: - init_msg, first_quotes = await bus.nursery.start( + await bus.nursery.start( partial( allocate_persistent_feed, @@ -434,9 +443,30 @@ async def open_feed_bus( # subscriber init_msg, first_quotes = bus.feeds[symbol] + msg = init_msg[symbol] + bfqsn = msg['fqsn'] + + # true fqsn + fqsn = '.'.join([bfqsn, brokername]) + assert fqsn in first_quotes + assert bus.feeds[fqsn] + + # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) + bsym = symbol + f'.{brokername}' + assert bsym in first_quotes + + # we use the broker-specific fqsn (bfqsn) for + # the sampler subscription since the backend isn't (yet) + # expected to append it's own name to the fqsn, so we filter + # on keys which *do not* include that name (e.g .ib) . + bus._subscribers.setdefault(bfqsn, []) + # send this even to subscribers to existing feed? # deliver initial info message a first quote asap - await ctx.started((init_msg, first_quotes)) + await ctx.started(( + init_msg, + first_quotes, + )) if not start_stream: log.warning(f'Not opening real-time stream for {fqsn}') @@ -449,14 +479,12 @@ async def open_feed_bus( # re-send to trigger display loop cycle (necessary especially # when the mkt is closed and no real-time messages are # expected). - await stream.send(first_quotes) + await stream.send({fqsn: first_quotes}) + # open a bg task which receives quotes over a mem chan + # and only pushes them to the target actor-consumer at + # a max ``tick_throttle`` instantaneous rate. if tick_throttle: - - # open a bg task which receives quotes over a mem chan - # and only pushes them to the target actor-consumer at - # a max ``tick_throttle`` instantaneous rate. - send, recv = trio.open_memory_channel(2**10) cs = await bus.start_task( uniform_rate_send, @@ -469,12 +497,15 @@ async def open_feed_bus( else: sub = (stream, tick_throttle) - subs = bus._subscribers[symbol] + subs = bus._subscribers[bfqsn] subs.append(sub) try: uid = ctx.chan.uid + # ctrl protocol for start/stop of quote streams based on UI + # state (eg. don't need a stream when a symbol isn't being + # displayed). async for msg in stream: if msg == 'pause': @@ -499,7 +530,7 @@ async def open_feed_bus( # n.cancel_scope.cancel() cs.cancel() try: - bus._subscribers[symbol].remove(sub) + bus._subscribers[bfqsn].remove(sub) except ValueError: log.warning(f'{sub} for {symbol} was already removed?') @@ -625,10 +656,10 @@ async def install_brokerd_search( @asynccontextmanager async def open_feed( - brokername: str, - symbols: list[str], - loglevel: Optional[str] = None, + fqsns: list[str], + + loglevel: Optional[str] = None, backpressure: bool = True, start_stream: bool = True, tick_throttle: Optional[float] = None, # Hz @@ -638,7 +669,10 @@ async def open_feed( Open a "data feed" which provides streamed real-time quotes. ''' - sym = symbols[0].lower() + fqsn = fqsns[0].lower() + + brokername, key, suffix = uncons_fqsn(fqsn) + bfqsn = fqsn.replace('.' + brokername, '') try: mod = get_brokermod(brokername) @@ -659,7 +693,7 @@ async def open_feed( portal.open_context( open_feed_bus, brokername=brokername, - symbol=sym, + symbol=bfqsn, loglevel=loglevel, start_stream=start_stream, tick_throttle=tick_throttle, @@ -676,9 +710,10 @@ async def open_feed( ): # we can only read from shm shm = attach_shm_array( - token=init_msg[sym]['shm_token'], + token=init_msg[bfqsn]['shm_token'], readonly=True, ) + assert fqsn in first_quotes feed = Feed( name=brokername, @@ -691,17 +726,15 @@ async def open_feed( ) for sym, data in init_msg.items(): - si = data['symbol_info'] - - symbol = mk_symbol( - key=sym, - type_key=si.get('asset_type', 'forex'), - tick_size=si.get('price_tick_size', 0.01), - lot_tick_size=si.get('lot_tick_size', 0.0), + fqsn = data['fqsn'] + f'.{brokername}' + symbol = Symbol.from_fqsn( + fqsn, + info=si, ) - symbol.broker_info[brokername] = si + # symbol.broker_info[brokername] = si + feed.symbols[fqsn] = symbol feed.symbols[sym] = symbol # cast shm dtype to list... can't member why we need this @@ -725,8 +758,7 @@ async def open_feed( @asynccontextmanager async def maybe_open_feed( - brokername: str, - symbols: list[str], + fqsns: list[str], loglevel: Optional[str] = None, **kwargs, @@ -738,13 +770,12 @@ async def maybe_open_feed( in a tractor broadcast receiver. ''' - sym = symbols[0].lower() + fqsn = fqsns[0] async with maybe_open_context( acm_func=open_feed, kwargs={ - 'brokername': brokername, - 'symbols': [sym], + 'fqsns': fqsns, 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), @@ -752,11 +783,11 @@ async def maybe_open_feed( 'backpressure': kwargs.get('backpressure', True), 'start_stream': kwargs.get('start_stream', True), }, - key=sym, + key=fqsn, ) as (cache_hit, feed): if cache_hit: - log.info(f'Using cached feed for {brokername}.{sym}') + log.info(f'Using cached feed for {fqsn}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use async with feed.stream.subscribe() as bstream: