diff --git a/piker/data/feed.py b/piker/data/feed.py index 530bed92..13072acf 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -26,6 +26,7 @@ from collections import ( Counter, ) from contextlib import asynccontextmanager as acm +from decimal import Decimal from datetime import datetime from functools import partial import time @@ -71,8 +72,10 @@ from ._sharedmem import ( from .ingest import get_ingestormod from .types import Struct from ..accounting._mktinfo import ( + Asset, + MktPair, + unpack_fqme, Symbol, - unpack_fqsn, ) from ._source import base_iohlc_dtype from ..ui import _search @@ -565,7 +568,7 @@ async def tsdb_backfill( timeframe=timeframe, ) - broker, symbol, expiry = unpack_fqsn(fqsn) + broker, symbol, expiry = unpack_fqme(fqsn) try: ( latest_start_dt, @@ -1009,17 +1012,44 @@ async def allocate_persistent_feed( # the broker-specific fully qualified symbol name, # but ensure it is lower-cased for external use. - bfqsn = msg['fqsn'].lower() + bs_mktid = msg['fqsn'].lower() - # true fqsn including broker/provider suffix - fqsn = '.'.join((bfqsn, brokername)) - # msg['fqsn'] = bfqsn + # true fqme including broker/provider suffix + fqme = '.'.join((bs_mktid, brokername)) - symbol = Symbol.from_fqsn( - fqsn=fqsn, - info=msg['symbol_info'], - ) - assert symbol.type_key + mktinfo = msg.get('mkt_info') + if not mktinfo: + + mktinfo = msg['symbol_info'] + + # TODO: read out renamed/new tick size fields in block below! + price_tick = mktinfo.get( + 'price_tick_size', + Decimal('0.01'), + ) + size_tick = mktinfo.get( + 'lot_tick_size', + Decimal('0.0'), + ) + + log.warning(f'FQME: {fqme} -> backend needs port to `MktPair`') + mkt = MktPair.from_fqme( + fqme, + price_tick=price_tick, + size_tick=size_tick, + bs_mktid=bs_mktid, + + _atype=mktinfo['asset_type'] + ) + + else: + # the new msg-protocol is to expect an already packed + # ``Asset`` and ``MktPair`` object from the backend + mkt = mktinfo + assert isinstance(mkt, MktPair) + assert isinstance(mkt.dst, Asset) + + assert mkt.type_key # HISTORY storage, run 2 tasks: # - a history loader / maintainer @@ -1040,17 +1070,24 @@ async def allocate_persistent_feed( manage_history, mod, bus, - fqsn, + fqme, some_data_ready, feed_is_live, ) # yield back control to starting nursery once we receive either # some history or a real-time quote. - log.info(f'waiting on history to load: {fqsn}') + log.info(f'waiting on history to load: {fqme}') await some_data_ready.wait() + symbol = Symbol.from_fqsn( + fqsn=fqme, + info=msg['symbol_info'], + ) flume = Flume( + # TODO: we have to use this for now since currently the + # MktPair above doesn't render the correct output key it seems + # when we provide the `MktInfo` here?..? symbol=symbol, first_quote=first_quote, _rt_shm_token=rt_shm.token, @@ -1061,7 +1098,7 @@ async def allocate_persistent_feed( # for ambiguous names we simply apply the retreived # feed to that name (for now). - bus.feeds[symstr] = bus.feeds[bfqsn] = flume + bus.feeds[symstr] = bus.feeds[bs_mktid] = flume task_status.started() @@ -1104,7 +1141,7 @@ async def allocate_persistent_feed( # start sample loop and shm incrementer task for OHLC style sampling # at the above registered step periods. try: - log.info(f'Starting sampler task for {fqsn}') + log.info(f'Starting sampler task for {fqme}') await sample_and_broadcast( bus, rt_shm, @@ -1114,7 +1151,7 @@ async def allocate_persistent_feed( sum_tick_vlm ) finally: - log.warning(f'{fqsn} feed task terminated') + log.warning(f'{fqme} feed task terminated') @tractor.context @@ -1197,22 +1234,22 @@ async def open_feed_bus( # subscriber flume = bus.feeds[symbol] sym = flume.symbol - bfqsn = sym.key + bs_mktid = sym.key fqsn = sym.fqsn # true fqsn - assert bfqsn in fqsn and brokername in fqsn + assert bs_mktid in fqsn and brokername in fqsn if sym.suffix: - bfqsn = fqsn.removesuffix(f'.{brokername}') - log.warning(f'{brokername} expanded symbol {symbol} -> {bfqsn}') + bs_mktid = fqsn.removesuffix(f'.{brokername}') + log.warning(f'{brokername} expanded symbol {symbol} -> {bs_mktid}') # pack for ``.started()`` sync msg flumes[fqsn] = flume - # we use the broker-specific fqsn (bfqsn) for - # the sampler subscription since the backend isn't (yet) - # expected to append it's own name to the fqsn, so we filter - # on keys which *do not* include that name (e.g .ib) . - bus._subscribers.setdefault(bfqsn, set()) + # we use the broker-specific market id (bs_mktid) for the + # sampler subscription since the backend isn't (yet) expected to + # append it's own name to the fqsn, so we filter on keys which + # *do not* include that name (e.g .ib) . + bus._subscribers.setdefault(bs_mktid, set()) # sync feed subscribers with flume handles await ctx.started( @@ -1276,9 +1313,9 @@ async def open_feed_bus( # maybe use the current task-id to key the sub list that's # added / removed? Or maybe we can add a general # pause-resume by sub-key api? - bfqsn = fqsn.removesuffix(f'.{brokername}') - local_subs.setdefault(bfqsn, set()).add(sub) - bus.add_subs(bfqsn, {sub}) + bs_mktid = fqsn.removesuffix(f'.{brokername}') + local_subs.setdefault(bs_mktid, set()).add(sub) + bus.add_subs(bs_mktid, {sub}) # sync caller with all subs registered state sub_registered.set() @@ -1291,16 +1328,16 @@ async def open_feed_bus( async for msg in stream: if msg == 'pause': - for bfqsn, subs in local_subs.items(): + for bs_mktid, subs in local_subs.items(): log.info( - f'Pausing {bfqsn} feed for {uid}') - bus.remove_subs(bfqsn, subs) + f'Pausing {bs_mktid} feed for {uid}') + bus.remove_subs(bs_mktid, subs) elif msg == 'resume': - for bfqsn, subs in local_subs.items(): + for bs_mktid, subs in local_subs.items(): log.info( - f'Resuming {bfqsn} feed for {uid}') - bus.add_subs(bfqsn, subs) + f'Resuming {bs_mktid} feed for {uid}') + bus.add_subs(bs_mktid, subs) else: raise ValueError(msg) @@ -1314,8 +1351,8 @@ async def open_feed_bus( cs.cancel() # drop all subs for this task from the bus - for bfqsn, subs in local_subs.items(): - bus.remove_subs(bfqsn, subs) + for bs_mktid, subs in local_subs.items(): + bus.remove_subs(bs_mktid, subs) class Feed(Struct): @@ -1512,7 +1549,7 @@ async def open_feed( feed = Feed() for fqsn in fqsns: - brokername, key, suffix = unpack_fqsn(fqsn) + brokername, key, suffix = unpack_fqme(fqsn) bfqsn = fqsn.replace('.' + brokername, '') try: