Use `MktPair` for `Flume.symbol` when used by backend

Initial attempt at getting the sampling and shm layer to use the new mkt
info meta-data type. Draft out a potential `BackendInitMsg:
msgspec.Struct` for validating the init msg returned from the
`stream_quotes()` start value; obvs don't actually use it yet.
pre_overruns_ctxcancelled
Tyler Goodlet 2023-03-21 21:50:35 -04:00
parent 76fe9018cf
commit 986bb4e7c8
1 changed files with 43 additions and 12 deletions

View File

@ -933,6 +933,24 @@ async def manage_history(
await trio.sleep_forever()
class BackendInitMsg(Struct, frozen=True):
'''
A stringent data provider startup msg schema validator.
The fields defined here are matched with those absolutely required
from each backend broker/data provider.
'''
fqsn: str
symbol_info: dict | None = None
mkt_info: MktPair | None = None
shm_write_opts: dict[str, Any] | None = None
def validate_init_msg() -> None:
...
async def allocate_persistent_feed(
bus: _FeedsBus,
sub_registered: trio.Event,
@ -977,7 +995,10 @@ async def allocate_persistent_feed(
# establish broker backend quote stream by calling
# ``stream_quotes()``, which is a required broker backend endpoint.
init_msg, first_quote = await bus.nursery.start(
(
init_msg,
first_quote,
) = await bus.nursery.start(
partial(
mod.stream_quotes,
send_chan=send,
@ -1008,19 +1029,24 @@ async def allocate_persistent_feed(
# a small streaming machine around the remote feed which can then
# do the normal work of sampling and writing shm buffers
# (depending on if we want sampling done on the far end or not?)
msg = init_msg[symstr]
per_mkt_init_msg = init_msg[symstr]
# the broker-specific fully qualified symbol name,
# but ensure it is lower-cased for external use.
bs_mktid = msg['fqsn'].lower()
bs_mktid = per_mkt_init_msg['fqsn'].lower()
# true fqme including broker/provider suffix
fqme = '.'.join((bs_mktid, brokername))
mktinfo = msg.get('mkt_info')
mktinfo = per_mkt_init_msg.get('mkt_info')
if not mktinfo:
mktinfo = msg['symbol_info']
log.warning(
f'BACKEND {brokername} is using old `Symbol` style API\n'
'IT SHOULD BE PORTED TO THE NEW `.accounting._mktinfo.MktPair`\n'
'STATTTTT!!!\n'
)
mktinfo = per_mkt_init_msg['symbol_info']
# TODO: read out renamed/new tick size fields in block below!
price_tick = mktinfo.get(
@ -1042,10 +1068,15 @@ async def allocate_persistent_feed(
_atype=mktinfo['asset_type']
)
symbol = Symbol.from_fqsn(
fqsn=fqme,
info=mktinfo,
)
else:
# the new msg-protocol is to expect an already packed
# ``Asset`` and ``MktPair`` object from the backend
mkt = mktinfo
symbol = mkt = mktinfo
assert isinstance(mkt, MktPair)
assert isinstance(mkt.dst, Asset)
@ -1080,15 +1111,13 @@ async def allocate_persistent_feed(
log.info(f'waiting on history to load: {fqme}')
await some_data_ready.wait()
symbol = Symbol.from_fqsn(
fqsn=fqme,
info=msg['symbol_info'],
)
flume = Flume(
# TODO: we have to use this for now since currently the
# MktPair above doesn't render the correct output key it seems
# when we provide the `MktInfo` here?..?
symbol=symbol,
first_quote=first_quote,
_rt_shm_token=rt_shm.token,
_hist_shm_token=hist_shm.token,
@ -1109,6 +1138,8 @@ async def allocate_persistent_feed(
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait()
# NOTE: if not configured otherwise, we always sum tick volume
# values in the OHLCV sampler.
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
@ -1132,7 +1163,7 @@ async def allocate_persistent_feed(
rt_shm.array['time'][1] = ts + 1
elif hist_shm.array.size == 0:
await tractor.breakpoint()
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
# wait the spawning parent task to register its subscriber
# send-stream entry before we start the sample loop.
@ -1235,7 +1266,7 @@ async def open_feed_bus(
flume = bus.feeds[symbol]
sym = flume.symbol
bs_mktid = sym.key
fqsn = sym.fqsn # true fqsn
fqsn = sym.fqme # true fqsn
assert bs_mktid in fqsn and brokername in fqsn
if sym.suffix: