Use `MktPair` for `Flume.symbol` when used by backend

Initial attempt at getting the sampling and shm layer to use the new mkt
info meta-data type. Draft out a potential `BackendInitMsg:
msgspec.Struct` for validating the init msg returned from the
`stream_quotes()` start value; obvs don't actually use it yet.
rekt_pps
Tyler Goodlet 2023-03-21 21:50:35 -04:00
parent 1d08ee6d01
commit 0f3041724b
1 changed files with 43 additions and 12 deletions

View File

@ -933,6 +933,24 @@ async def manage_history(
await trio.sleep_forever() await trio.sleep_forever()
class BackendInitMsg(Struct, frozen=True):
'''
A stringent data provider startup msg schema validator.
The fields defined here are matched with those absolutely required
from each backend broker/data provider.
'''
fqsn: str
symbol_info: dict | None = None
mkt_info: MktPair | None = None
shm_write_opts: dict[str, Any] | None = None
def validate_init_msg() -> None:
...
async def allocate_persistent_feed( async def allocate_persistent_feed(
bus: _FeedsBus, bus: _FeedsBus,
sub_registered: trio.Event, sub_registered: trio.Event,
@ -977,7 +995,10 @@ async def allocate_persistent_feed(
# establish broker backend quote stream by calling # establish broker backend quote stream by calling
# ``stream_quotes()``, which is a required broker backend endpoint. # ``stream_quotes()``, which is a required broker backend endpoint.
init_msg, first_quote = await bus.nursery.start( (
init_msg,
first_quote,
) = await bus.nursery.start(
partial( partial(
mod.stream_quotes, mod.stream_quotes,
send_chan=send, send_chan=send,
@ -1008,19 +1029,24 @@ async def allocate_persistent_feed(
# a small streaming machine around the remote feed which can then # a small streaming machine around the remote feed which can then
# do the normal work of sampling and writing shm buffers # do the normal work of sampling and writing shm buffers
# (depending on if we want sampling done on the far end or not?) # (depending on if we want sampling done on the far end or not?)
msg = init_msg[symstr] per_mkt_init_msg = init_msg[symstr]
# the broker-specific fully qualified symbol name, # the broker-specific fully qualified symbol name,
# but ensure it is lower-cased for external use. # but ensure it is lower-cased for external use.
bs_mktid = msg['fqsn'].lower() bs_mktid = per_mkt_init_msg['fqsn'].lower()
# true fqme including broker/provider suffix # true fqme including broker/provider suffix
fqme = '.'.join((bs_mktid, brokername)) fqme = '.'.join((bs_mktid, brokername))
mktinfo = msg.get('mkt_info') mktinfo = per_mkt_init_msg.get('mkt_info')
if not mktinfo: if not mktinfo:
mktinfo = msg['symbol_info'] log.warning(
f'BACKEND {brokername} is using old `Symbol` style API\n'
'IT SHOULD BE PORTED TO THE NEW `.accounting._mktinfo.MktPair`\n'
'STATTTTT!!!\n'
)
mktinfo = per_mkt_init_msg['symbol_info']
# TODO: read out renamed/new tick size fields in block below! # TODO: read out renamed/new tick size fields in block below!
price_tick = mktinfo.get( price_tick = mktinfo.get(
@ -1042,10 +1068,15 @@ async def allocate_persistent_feed(
_atype=mktinfo['asset_type'] _atype=mktinfo['asset_type']
) )
symbol = Symbol.from_fqsn(
fqsn=fqme,
info=mktinfo,
)
else: else:
# the new msg-protocol is to expect an already packed # the new msg-protocol is to expect an already packed
# ``Asset`` and ``MktPair`` object from the backend # ``Asset`` and ``MktPair`` object from the backend
mkt = mktinfo symbol = mkt = mktinfo
assert isinstance(mkt, MktPair) assert isinstance(mkt, MktPair)
assert isinstance(mkt.dst, Asset) assert isinstance(mkt.dst, Asset)
@ -1080,15 +1111,13 @@ async def allocate_persistent_feed(
log.info(f'waiting on history to load: {fqme}') log.info(f'waiting on history to load: {fqme}')
await some_data_ready.wait() await some_data_ready.wait()
symbol = Symbol.from_fqsn(
fqsn=fqme,
info=msg['symbol_info'],
)
flume = Flume( flume = Flume(
# TODO: we have to use this for now since currently the # TODO: we have to use this for now since currently the
# MktPair above doesn't render the correct output key it seems # MktPair above doesn't render the correct output key it seems
# when we provide the `MktInfo` here?..? # when we provide the `MktInfo` here?..?
symbol=symbol, symbol=symbol,
first_quote=first_quote, first_quote=first_quote,
_rt_shm_token=rt_shm.token, _rt_shm_token=rt_shm.token,
_hist_shm_token=hist_shm.token, _hist_shm_token=hist_shm.token,
@ -1109,6 +1138,8 @@ async def allocate_persistent_feed(
# the backend will indicate when real-time quotes have begun. # the backend will indicate when real-time quotes have begun.
await feed_is_live.wait() await feed_is_live.wait()
# NOTE: if not configured otherwise, we always sum tick volume
# values in the OHLCV sampler.
sum_tick_vlm: bool = init_msg.get( sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
@ -1132,7 +1163,7 @@ async def allocate_persistent_feed(
rt_shm.array['time'][1] = ts + 1 rt_shm.array['time'][1] = ts + 1
elif hist_shm.array.size == 0: elif hist_shm.array.size == 0:
await tractor.breakpoint() raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
# wait the spawning parent task to register its subscriber # wait the spawning parent task to register its subscriber
# send-stream entry before we start the sample loop. # send-stream entry before we start the sample loop.
@ -1235,7 +1266,7 @@ async def open_feed_bus(
flume = bus.feeds[symbol] flume = bus.feeds[symbol]
sym = flume.symbol sym = flume.symbol
bs_mktid = sym.key bs_mktid = sym.key
fqsn = sym.fqsn # true fqsn fqsn = sym.fqme # true fqsn
assert bs_mktid in fqsn and brokername in fqsn assert bs_mktid in fqsn and brokername in fqsn
if sym.suffix: if sym.suffix: