From a8cb6c20563716f1a5a6f71aa6f42b0bdc96b5de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 14:47:28 -0400 Subject: [PATCH] Make the data feed layer "fqsn" aware In order to support instruments with lifetimes (aka derivatives) we need generally need special symbol annotations which detail such meta data (such as `MNQ.GLOBEX.20220717` for daq futes). Further there is really no reason for the public api for this feed layer to care about getting a special "brokername" field since generally the data is coming directly from UIs (eg. search selection) so we might as well accept a fqsn (fully qualified symbol name) which includes the broker name; for now a suffix like `'.ib'`. We may change this schema (soon) but this at least gets us to a point where we expect the full name including broker/provider. An additional detail: for certain "generic" symbol names (like for futes) we will pull a so called "front contract" and map this to a specific fqsn underneath, so there is a double (cached) entry for that entry such that other consumers can use it the same way if desired. Some other machinery changes: - expect the `stream_quotes()` endpoint to deliver it's `.started()` msg almost immediately since we now need it deliver any fqsn asap (yes this means the ep should no longer wait on a "live" first quote and instead deliver what quote data it can right away. - expect the quotes ohlc sampler task to add in the broker name before broadcast to remote (actor) consumers since the backend isn't (yet) expected to do that add in itself. - obviously we start using all the new fqsn related `Symbol` apis --- piker/data/feed.py | 182 +++++++++++++++++++++++++++------------------ 1 file changed, 111 insertions(+), 71 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index fcc4ed8f..3dc5c9b1 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -55,7 +55,7 @@ from .ingest import get_ingestormod from ._source import ( base_iohlc_dtype, Symbol, - mk_fqsn, + uncons_fqsn, ) from ..ui import _search from ._sampling import ( @@ -195,7 +195,7 @@ async def _setup_persistent_brokerd( async def manage_history( mod: ModuleType, bus: _FeedsBus, - symbol: str, + fqsn: str, some_data_ready: trio.Event, feed_is_live: trio.Event, @@ -209,8 +209,6 @@ async def manage_history( buffer. ''' - fqsn = mk_fqsn(mod.name, symbol) - # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. shm, opened = maybe_open_shm_array( @@ -232,10 +230,13 @@ async def manage_history( async with marketstore.open_storage_client( fqsn, ) as (storage, tsdb_arrays): + # TODO: history validation # assert opened, f'Persistent shm for {symbol} was already open?!' # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") + # raise RuntimeError( + # "Persistent shm for sym was already open?!" + # ) if tsdb_arrays: log.info(f'Loaded tsdb history {tsdb_arrays}') @@ -264,7 +265,7 @@ async def manage_history( ) # start history anal and load missing new data via backend. - async with mod.open_history_client(symbol) as hist: + async with mod.open_history_client(fqsn) as hist: # get latest query's worth of history array, next_dt = await hist(end_dt='') @@ -280,7 +281,7 @@ async def manage_history( # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars - _ = await bus.nursery.start(mod.backfill_bars, symbol, shm) + _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) # yield back after client connect with filled shm task_status.started(shm) @@ -341,8 +342,6 @@ async def allocate_persistent_feed( except ImportError: mod = get_ingestormod(brokername) - fqsn = mk_fqsn(brokername, symbol) - # mem chan handed to broker backend so it can push real-time # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) @@ -351,28 +350,9 @@ async def allocate_persistent_feed( some_data_ready = trio.Event() feed_is_live = trio.Event() - # run 2 tasks: - # - a history loader / maintainer - # - a real-time streamer which consumers and sends new data to any - # consumers as well as writes to storage backends (as configured). - - # XXX: neither of these will raise but will cause an inf hang due to: - # https://github.com/python-trio/trio/issues/2258 - # bus.nursery.start_soon( - # await bus.start_task( - - shm = await bus.nursery.start( - manage_history, - mod, - bus, - symbol, - some_data_ready, - feed_is_live, - ) - # establish broker backend quote stream by calling # ``stream_quotes()``, which is a required broker backend endpoint. - init_msg, first_quotes = await bus.nursery.start( + init_msg, first_quote = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, @@ -381,15 +361,38 @@ async def allocate_persistent_feed( loglevel=loglevel, ) ) + # the broker-specific fully qualified symbol name + bfqsn = init_msg[symbol]['fqsn'] + + # HISTORY, run 2 tasks: + # - a history loader / maintainer + # - a real-time streamer which consumers and sends new data to any + # consumers as well as writes to storage backends (as configured). + + # XXX: neither of these will raise but will cause an inf hang due to: + # https://github.com/python-trio/trio/issues/2258 + # bus.nursery.start_soon( + # await bus.start_task( + shm = await bus.nursery.start( + manage_history, + mod, + bus, + bfqsn, + some_data_ready, + feed_is_live, + ) # we hand an IPC-msg compatible shm token to the caller so it # can read directly from the memory which will be written by # this task. - init_msg[symbol]['shm_token'] = shm.token - # symbol = Symbol.from_broker_info( - # fqsn, - # init_msg[symbol]['symbol_info'] - # ) + msg = init_msg[symbol] + msg['shm_token'] = shm.token + + # true fqsn + fqsn = '.'.join((bfqsn, brokername)) + + # add a fqsn entry that includes the ``.`` suffix + init_msg[fqsn] = msg # TODO: pretty sure we don't need this? why not just leave 1s as # the fastest "sample period" since we'll probably always want that @@ -402,8 +405,22 @@ async def allocate_persistent_feed( log.info(f'waiting on history to load: {fqsn}') await some_data_ready.wait() - bus.feeds[symbol.lower()] = (init_msg, first_quotes) - task_status.started((init_msg, first_quotes)) + # append ``.`` suffix to each quote symbol + bsym = symbol + f'.{brokername}' + generic_first_quotes = { + bsym: first_quote, + fqsn: first_quote, + } + + bus.feeds[symbol] = bus.feeds[fqsn] = ( + init_msg, + generic_first_quotes, + ) + # for ambiguous names we simply apply the retreived + # feed to that name (for now). + + # task_status.started((init_msg, generic_first_quotes)) + task_status.started() if not start_stream: await trio.sleep_forever() @@ -421,10 +438,11 @@ async def allocate_persistent_feed( bus, shm, quote_stream, + brokername, sum_tick_vlm ) finally: - log.warning(f'{symbol}@{brokername} feed task terminated') + log.warning(f'{fqsn} feed task terminated') @tractor.context @@ -457,19 +475,16 @@ async def open_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) - bus._subscribers.setdefault(symbol, []) - fqsn = mk_fqsn(brokername, symbol) - - entry = bus.feeds.get(symbol) # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery + entry = bus.feeds.get(symbol) if entry is None: - # allocate a new actor-local stream bus which will persist for - # this `brokerd`. + # allocate a new actor-local stream bus which + # will persist for this `brokerd`. async with bus.task_lock: - init_msg, first_quotes = await bus.nursery.start( + await bus.nursery.start( partial( allocate_persistent_feed, @@ -490,9 +505,30 @@ async def open_feed_bus( # subscriber init_msg, first_quotes = bus.feeds[symbol] + msg = init_msg[symbol] + bfqsn = msg['fqsn'] + + # true fqsn + fqsn = '.'.join([bfqsn, brokername]) + assert fqsn in first_quotes + assert bus.feeds[fqsn] + + # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) + bsym = symbol + f'.{brokername}' + assert bsym in first_quotes + + # we use the broker-specific fqsn (bfqsn) for + # the sampler subscription since the backend isn't (yet) + # expected to append it's own name to the fqsn, so we filter + # on keys which *do not* include that name (e.g .ib) . + bus._subscribers.setdefault(bfqsn, []) + # send this even to subscribers to existing feed? # deliver initial info message a first quote asap - await ctx.started((init_msg, first_quotes)) + await ctx.started(( + init_msg, + first_quotes, + )) if not start_stream: log.warning(f'Not opening real-time stream for {fqsn}') @@ -505,14 +541,12 @@ async def open_feed_bus( # re-send to trigger display loop cycle (necessary especially # when the mkt is closed and no real-time messages are # expected). - await stream.send(first_quotes) + await stream.send({fqsn: first_quotes}) + # open a bg task which receives quotes over a mem chan + # and only pushes them to the target actor-consumer at + # a max ``tick_throttle`` instantaneous rate. if tick_throttle: - - # open a bg task which receives quotes over a mem chan - # and only pushes them to the target actor-consumer at - # a max ``tick_throttle`` instantaneous rate. - send, recv = trio.open_memory_channel(2**10) cs = await bus.start_task( uniform_rate_send, @@ -525,12 +559,15 @@ async def open_feed_bus( else: sub = (stream, tick_throttle) - subs = bus._subscribers[symbol] + subs = bus._subscribers[bfqsn] subs.append(sub) try: uid = ctx.chan.uid + # ctrl protocol for start/stop of quote streams based on UI + # state (eg. don't need a stream when a symbol isn't being + # displayed). async for msg in stream: if msg == 'pause': @@ -555,7 +592,7 @@ async def open_feed_bus( # n.cancel_scope.cancel() cs.cancel() try: - bus._subscribers[symbol].remove(sub) + bus._subscribers[bfqsn].remove(sub) except ValueError: log.warning(f'{sub} for {symbol} was already removed?') @@ -681,10 +718,10 @@ async def install_brokerd_search( @asynccontextmanager async def open_feed( - brokername: str, - symbols: list[str], - loglevel: Optional[str] = None, + fqsns: list[str], + + loglevel: Optional[str] = None, backpressure: bool = True, start_stream: bool = True, tick_throttle: Optional[float] = None, # Hz @@ -694,7 +731,10 @@ async def open_feed( Open a "data feed" which provides streamed real-time quotes. ''' - sym = symbols[0].lower() + fqsn = fqsns[0].lower() + + brokername, key, suffix = uncons_fqsn(fqsn) + bfqsn = fqsn.replace('.' + brokername, '') try: mod = get_brokermod(brokername) @@ -715,7 +755,7 @@ async def open_feed( portal.open_context( open_feed_bus, brokername=brokername, - symbol=sym, + symbol=bfqsn, loglevel=loglevel, start_stream=start_stream, tick_throttle=tick_throttle, @@ -732,9 +772,10 @@ async def open_feed( ): # we can only read from shm shm = attach_shm_array( - token=init_msg[sym]['shm_token'], + token=init_msg[bfqsn]['shm_token'], readonly=True, ) + assert fqsn in first_quotes feed = Feed( name=brokername, @@ -747,14 +788,15 @@ async def open_feed( ) for sym, data in init_msg.items(): - si = data['symbol_info'] - symbol = Symbol.from_broker_info( - brokername, - sym, - si, + fqsn = data['fqsn'] + f'.{brokername}' + symbol = Symbol.from_fqsn( + fqsn, + info=si, ) + # symbol.broker_info[brokername] = si + feed.symbols[fqsn] = symbol feed.symbols[sym] = symbol # cast shm dtype to list... can't member why we need this @@ -778,8 +820,7 @@ async def open_feed( @asynccontextmanager async def maybe_open_feed( - brokername: str, - symbols: list[str], + fqsns: list[str], loglevel: Optional[str] = None, **kwargs, @@ -791,13 +832,12 @@ async def maybe_open_feed( in a tractor broadcast receiver. ''' - sym = symbols[0].lower() + fqsn = fqsns[0] async with maybe_open_context( acm_func=open_feed, kwargs={ - 'brokername': brokername, - 'symbols': [sym], + 'fqsns': fqsns, 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), @@ -805,11 +845,11 @@ async def maybe_open_feed( 'backpressure': kwargs.get('backpressure', True), 'start_stream': kwargs.get('start_stream', True), }, - key=sym, + key=fqsn, ) as (cache_hit, feed): if cache_hit: - log.info(f'Using cached feed for {brokername}.{sym}') + log.info(f'Using cached feed for {fqsn}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use async with feed.stream.subscribe() as bstream: