From 0f3041724b95cc4bbae9e08e5a9b0e29d417e835 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 21 Mar 2023 21:50:35 -0400 Subject: [PATCH] Use `MktPair` for `Flume.symbol` when used by backend Initial attempt at getting the sampling and shm layer to use the new mkt info meta-data type. Draft out a potential `BackendInitMsg: msgspec.Struct` for validating the init msg returned from the `stream_quotes()` start value; obvs don't actually use it yet. --- piker/data/feed.py | 55 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 13072acf..405a8f57 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -933,6 +933,24 @@ async def manage_history( await trio.sleep_forever() +class BackendInitMsg(Struct, frozen=True): + ''' + A stringent data provider startup msg schema validator. + + The fields defined here are matched with those absolutely required + from each backend broker/data provider. + + ''' + fqsn: str + symbol_info: dict | None = None + mkt_info: MktPair | None = None + shm_write_opts: dict[str, Any] | None = None + + +def validate_init_msg() -> None: + ... + + async def allocate_persistent_feed( bus: _FeedsBus, sub_registered: trio.Event, @@ -977,7 +995,10 @@ async def allocate_persistent_feed( # establish broker backend quote stream by calling # ``stream_quotes()``, which is a required broker backend endpoint. - init_msg, first_quote = await bus.nursery.start( + ( + init_msg, + first_quote, + ) = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, @@ -1008,19 +1029,24 @@ async def allocate_persistent_feed( # a small streaming machine around the remote feed which can then # do the normal work of sampling and writing shm buffers # (depending on if we want sampling done on the far end or not?) - msg = init_msg[symstr] + per_mkt_init_msg = init_msg[symstr] # the broker-specific fully qualified symbol name, # but ensure it is lower-cased for external use. - bs_mktid = msg['fqsn'].lower() + bs_mktid = per_mkt_init_msg['fqsn'].lower() # true fqme including broker/provider suffix fqme = '.'.join((bs_mktid, brokername)) - mktinfo = msg.get('mkt_info') + mktinfo = per_mkt_init_msg.get('mkt_info') if not mktinfo: - mktinfo = msg['symbol_info'] + log.warning( + f'BACKEND {brokername} is using old `Symbol` style API\n' + 'IT SHOULD BE PORTED TO THE NEW `.accounting._mktinfo.MktPair`\n' + 'STATTTTT!!!\n' + ) + mktinfo = per_mkt_init_msg['symbol_info'] # TODO: read out renamed/new tick size fields in block below! price_tick = mktinfo.get( @@ -1042,10 +1068,15 @@ async def allocate_persistent_feed( _atype=mktinfo['asset_type'] ) + symbol = Symbol.from_fqsn( + fqsn=fqme, + info=mktinfo, + ) + else: # the new msg-protocol is to expect an already packed # ``Asset`` and ``MktPair`` object from the backend - mkt = mktinfo + symbol = mkt = mktinfo assert isinstance(mkt, MktPair) assert isinstance(mkt.dst, Asset) @@ -1080,15 +1111,13 @@ async def allocate_persistent_feed( log.info(f'waiting on history to load: {fqme}') await some_data_ready.wait() - symbol = Symbol.from_fqsn( - fqsn=fqme, - info=msg['symbol_info'], - ) flume = Flume( + # TODO: we have to use this for now since currently the # MktPair above doesn't render the correct output key it seems # when we provide the `MktInfo` here?..? symbol=symbol, + first_quote=first_quote, _rt_shm_token=rt_shm.token, _hist_shm_token=hist_shm.token, @@ -1109,6 +1138,8 @@ async def allocate_persistent_feed( # the backend will indicate when real-time quotes have begun. await feed_is_live.wait() + # NOTE: if not configured otherwise, we always sum tick volume + # values in the OHLCV sampler. sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True) @@ -1132,7 +1163,7 @@ async def allocate_persistent_feed( rt_shm.array['time'][1] = ts + 1 elif hist_shm.array.size == 0: - await tractor.breakpoint() + raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?') # wait the spawning parent task to register its subscriber # send-stream entry before we start the sample loop. @@ -1235,7 +1266,7 @@ async def open_feed_bus( flume = bus.feeds[symbol] sym = flume.symbol bs_mktid = sym.key - fqsn = sym.fqsn # true fqsn + fqsn = sym.fqme # true fqsn assert bs_mktid in fqsn and brokername in fqsn if sym.suffix: