Add `.data.validate` checker for live feed layer

More or less a replacement for what @guilledk did with the initial
attempt at a "broker check" type script a while back except in this case
we're going to always run this validation routine and it now uses a new
`FeedInit` struct to ensure backends are delivering the right schema-ed
data during startup. Also allows us to stick deprecation warnings / and
or strict API compat errors all in one spot (at least for live feeds).

Factors out a bunch of `MktPair` related adapter-logic into a new
`.validate.valiate_backend()` which warns to the backend implementer via
log msgs all the problems outstanding. Ideally we do our backend module
endpoint scan-and-complain regarding missing feature support from here
as well (eg. search, broker/trade ctl, ledger processing, etc.).
rekt_pps
Tyler Goodlet 2023-04-18 19:05:42 -04:00
parent d48b2c5b57
commit 4129d693be
2 changed files with 255 additions and 95 deletions

View File

@ -26,7 +26,7 @@ from collections import (
Counter, Counter,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from decimal import Decimal # from decimal import Decimal
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
import time import time
@ -55,8 +55,8 @@ import numpy as np
from ..brokers import get_brokermod from ..brokers import get_brokermod
from ..calc import humanize from ..calc import humanize
from ..log import ( from ._util import (
get_logger, log,
get_console_log, get_console_log,
) )
from ..service import ( from ..service import (
@ -64,6 +64,10 @@ from ..service import (
check_for_service, check_for_service,
) )
from .flows import Flume from .flows import Flume
from .validate import (
FeedInit,
validate_backend,
)
from ._sharedmem import ( from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
@ -72,10 +76,8 @@ from ._sharedmem import (
from .ingest import get_ingestormod from .ingest import get_ingestormod
from .types import Struct from .types import Struct
from ..accounting._mktinfo import ( from ..accounting._mktinfo import (
Asset,
MktPair, MktPair,
unpack_fqme, unpack_fqme,
Symbol,
) )
from ._source import base_iohlc_dtype from ._source import base_iohlc_dtype
from ..ui import _search from ..ui import _search
@ -91,8 +93,6 @@ from ..brokers._util import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ..service.marketstore import Storage from ..service.marketstore import Storage
log = get_logger(__name__)
class _FeedsBus(Struct): class _FeedsBus(Struct):
''' '''
@ -568,7 +568,7 @@ async def tsdb_backfill(
timeframe=timeframe, timeframe=timeframe,
) )
broker, symbol, expiry = unpack_fqme(fqsn) broker, *_ = unpack_fqme(fqsn)
try: try:
( (
latest_start_dt, latest_start_dt,
@ -790,13 +790,14 @@ async def manage_history(
# port = _runtime_vars['_root_mailbox'][1] # port = _runtime_vars['_root_mailbox'][1]
uid = tractor.current_actor().uid uid = tractor.current_actor().uid
suffix = '.'.join(uid) name, uuid = uid
service = name.rstrip(f'.{mod.name}')
# (maybe) allocate shm array for this broker/symbol which will # (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing. # be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array( hist_shm, opened = maybe_open_shm_array(
# key=f'{fqsn}_hist_p{port}', # key=f'{fqsn}_hist_p{port}',
key=f'{fqsn}_hist.{suffix}', key=f'piker.{service}[{uuid[:16]}.{fqsn}.hist',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -814,7 +815,8 @@ async def manage_history(
rt_shm, opened = maybe_open_shm_array( rt_shm, opened = maybe_open_shm_array(
# key=f'{fqsn}_rt_p{port}', # key=f'{fqsn}_rt_p{port}',
key=f'{fqsn}_rt.{suffix}', # key=f'piker.{service}.{fqsn}_rt.{uuid}',
key=f'piker.{service}[{uuid[:16]}.{fqsn}.rt',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -933,24 +935,6 @@ async def manage_history(
await trio.sleep_forever() await trio.sleep_forever()
class BackendInitMsg(Struct, frozen=True):
'''
A stringent data provider startup msg schema validator.
The fields defined here are matched with those absolutely required
from each backend broker/data provider.
'''
fqme: str
symbol_info: dict | None = None
mkt_info: MktPair | None = None
shm_write_opts: dict[str, Any] | None = None
def validate_init_msg() -> None:
...
async def allocate_persistent_feed( async def allocate_persistent_feed(
bus: _FeedsBus, bus: _FeedsBus,
sub_registered: trio.Event, sub_registered: trio.Event,
@ -961,7 +945,7 @@ async def allocate_persistent_feed(
loglevel: str, loglevel: str,
start_stream: bool = True, start_stream: bool = True,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[FeedInit] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
@ -991,22 +975,37 @@ async def allocate_persistent_feed(
some_data_ready = trio.Event() some_data_ready = trio.Event()
feed_is_live = trio.Event() feed_is_live = trio.Event()
symstr = symstr.lower()
# establish broker backend quote stream by calling # establish broker backend quote stream by calling
# ``stream_quotes()``, which is a required broker backend endpoint. # ``stream_quotes()``, a required broker backend endpoint.
init_msgs: (
list[FeedInit] # new
| dict[str, dict[str, str]] # legacy / deprecated
)
# TODO: probably make a struct msg type for this as well
# since eventually we do want to have more efficient IPC..
first_quote: dict[str, Any]
symstr = symstr.lower()
( (
init_msg, init_msgs,
first_quote, first_quote,
) = await bus.nursery.start( ) = await bus.nursery.start(
partial( partial(
mod.stream_quotes, mod.stream_quotes,
send_chan=send, send_chan=send,
feed_is_live=feed_is_live, feed_is_live=feed_is_live,
# NOTE / TODO: eventualy we may support providing more then
# one input here such that a datad daemon can multiplex
# multiple live feeds from one task, instead of getting
# a new request (and thus new task) for each subscription.
symbols=[symstr], symbols=[symstr],
loglevel=loglevel, loglevel=loglevel,
) )
) )
# TODO: this is indexed by symbol for now since we've planned (for # TODO: this is indexed by symbol for now since we've planned (for
# some time) to expect backends to handle single # some time) to expect backends to handle single
# ``.stream_quotes()`` calls with multiple symbols inputs to just # ``.stream_quotes()`` calls with multiple symbols inputs to just
@ -1029,58 +1028,15 @@ async def allocate_persistent_feed(
# a small streaming machine around the remote feed which can then # a small streaming machine around the remote feed which can then
# do the normal work of sampling and writing shm buffers # do the normal work of sampling and writing shm buffers
# (depending on if we want sampling done on the far end or not?) # (depending on if we want sampling done on the far end or not?)
per_mkt_init_msg = init_msg[symstr] init: FeedInit = validate_backend(
mod,
# the broker-specific fully qualified symbol name, [symstr],
# but ensure it is lower-cased for external use. init_msgs,
bs_mktid = per_mkt_init_msg['fqsn'].lower() )
bs_mktid: str = init.bs_mktid
# true fqme including broker/provider suffix mkt: MktPair = init.mkt_info
fqme = '.'.join((bs_mktid, brokername)) assert mkt.bs_mktid == bs_mktid
fqme: str = mkt.fqme
mktinfo = per_mkt_init_msg.get('mkt_info')
if not mktinfo:
log.warning(
f'BACKEND {brokername} is using old `Symbol` style API\n'
'IT SHOULD BE PORTED TO THE NEW `.accounting._mktinfo.MktPair`\n'
'STATTTTT!!!\n'
)
mktinfo = per_mkt_init_msg['symbol_info']
# TODO: read out renamed/new tick size fields in block below!
price_tick = mktinfo.get(
'price_tick_size',
Decimal('0.01'),
)
size_tick = mktinfo.get(
'lot_tick_size',
Decimal('0.0'),
)
log.warning(f'FQME: {fqme} -> backend needs port to `MktPair`')
mkt = MktPair.from_fqme(
fqme,
price_tick=price_tick,
size_tick=size_tick,
bs_mktid=bs_mktid,
_atype=mktinfo['asset_type']
)
symbol = Symbol.from_fqsn(
fqsn=fqme,
info=mktinfo,
)
else:
# the new msg-protocol is to expect an already packed
# ``Asset`` and ``MktPair`` object from the backend
symbol = mkt = mktinfo
assert isinstance(mkt, MktPair)
assert isinstance(mkt.dst, Asset)
assert mkt.type_key
# HISTORY storage, run 2 tasks: # HISTORY storage, run 2 tasks:
# - a history loader / maintainer # - a history loader / maintainer
@ -1116,7 +1072,7 @@ async def allocate_persistent_feed(
# TODO: we have to use this for now since currently the # TODO: we have to use this for now since currently the
# MktPair above doesn't render the correct output key it seems # MktPair above doesn't render the correct output key it seems
# when we provide the `MktInfo` here?..? # when we provide the `MktInfo` here?..?
mkt=symbol, mkt=mkt,
first_quote=first_quote, first_quote=first_quote,
_rt_shm_token=rt_shm.token, _rt_shm_token=rt_shm.token,
@ -1125,11 +1081,17 @@ async def allocate_persistent_feed(
izero_rt=izero_rt, izero_rt=izero_rt,
) )
# for ambiguous names we simply apply the retreived # for ambiguous names we simply register the
# flume for all possible name (sub) sets.
# feed to that name (for now). # feed to that name (for now).
bus.feeds[symstr] = bus.feeds[bs_mktid] = flume bus.feeds.update({
symstr: flume,
fqme: flume,
mkt.bs_fqme: flume,
})
task_status.started() # signal the ``open_feed_bus()`` caller task to continue
task_status.started(init)
if not start_stream: if not start_stream:
await trio.sleep_forever() await trio.sleep_forever()
@ -1140,9 +1102,7 @@ async def allocate_persistent_feed(
# NOTE: if not configured otherwise, we always sum tick volume # NOTE: if not configured otherwise, we always sum tick volume
# values in the OHLCV sampler. # values in the OHLCV sampler.
sum_tick_vlm: bool = init_msg.get( sum_tick_vlm: bool = (init.shm_write_opts or {}).get('sum_tick_vlm', True)
'shm_write_opts', {}
).get('sum_tick_vlm', True)
# NOTE: if no high-freq sampled data has (yet) been loaded, # NOTE: if no high-freq sampled data has (yet) been loaded,
# seed the buffer with a history datum - this is most handy # seed the buffer with a history datum - this is most handy
@ -1218,7 +1178,6 @@ async def open_feed_bus(
# ensure we are who we think we are # ensure we are who we think we are
servicename = tractor.current_actor().name servicename = tractor.current_actor().name
assert 'brokerd' in servicename assert 'brokerd' in servicename
assert brokername in servicename assert brokername in servicename
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
@ -1573,7 +1532,7 @@ async def open_feed(
feed = Feed() feed = Feed()
for fqsn in fqsns: for fqsn in fqsns:
brokername, key, suffix = unpack_fqme(fqsn) brokername, *_ = unpack_fqme(fqsn)
bfqsn = fqsn.replace('.' + brokername, '') bfqsn = fqsn.replace('.' + brokername, '')
try: try:

View File

@ -0,0 +1,201 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Data feed synchronization protocols, init msgs, and general
data-provider-backend-agnostic schema definitions.
'''
from decimal import Decimal
from pprint import pformat
from types import ModuleType
from typing import (
Any,
)
from .types import Struct
from ..accounting import (
Asset,
MktPair,
)
from ._util import log
class FeedInitializationError(ValueError):
'''
Live data feed setup failed due to API / msg incompatiblity!
'''
class FeedInit(Struct, frozen=True):
'''
A stringent data provider startup msg schema validator.
The fields defined here are matched with those absolutely required
from each backend broker/data provider.
'''
# backend specific, market endpoint id
bs_mktid: str
mkt_info: MktPair
shm_write_opts: dict[str, Any] | None = None
def validate_backend(
mod: ModuleType,
syms: list[str],
init_msgs: list[FeedInit] | dict[str, dict[str, Any]],
# TODO: do a module method scan and report mismatches.
check_eps: bool = False,
api_log_msg_level: str = 'critical'
) -> FeedInit:
'''
Fail on malformed live quotes feed config/init or warn on changes
that haven't been implemented by this backend yet.
'''
if isinstance(init_msgs, dict):
for i, (sym_str, msg) in enumerate(init_msgs.items()):
init: FeedInit | dict[str, Any] = msg
# XXX: eventually this WILL NOT necessarily be true.
if i > 0:
assert not len(init_msgs) == 1
keys: set = set(init_msgs.keys()) - set(syms)
raise FeedInitializationError(
'TOO MANY INIT MSGS!\n'
f'Unexpected keys: {keys}\n'
'ALL MSGS:\n'
f'{pformat(init_msgs)}\n'
)
# TODO: once all backends are updated we can remove this branching.
rx_msg: bool = False
warn_msg: str = ''
if not isinstance(init, FeedInit):
warn_msg += (
'\n'
'--------------------------\n'
':::DEPRECATED API STYLE:::\n'
'--------------------------\n'
f'`{mod.name}.stream_quotes()` should deliver '
'`.started(FeedInit)`\n'
f'|-> CURRENTLY it is using DEPRECATED `.started(dict)` style!\n'
f'|-> SEE `FeedInit` in `piker.data.validate`\n'
'--------------------------------------------\n'
)
else:
rx_msg = True
# verify feed init state / schema
bs_mktid: str # backend specific (unique) market id
bs_fqme: str # backend specific fqme
mkt: MktPair
match init:
case {
'symbol_info': dict(symbol_info),
'fqsn': bs_fqme,
} | {
'mkt_info': dict(symbol_info),
'fqsn': bs_fqme,
}:
symbol_info: dict
warn_msg += (
'It may also be still using the legacy `Symbol` style API\n'
'IT SHOULD BE PORTED TO THE NEW '
'`.accounting._mktinfo.MktPair`\n'
'STATTTTT!!!\n'
)
# XXX use default legacy (aka discrete precision) mkt
# price/size_ticks if none delivered.
price_tick = symbol_info.get(
'price_tick_size',
Decimal('0.01'),
)
size_tick = symbol_info.get(
'lot_tick_size',
Decimal('1'),
)
mkt = MktPair.from_fqme(
fqme=f'{bs_fqme}.{mod.name}',
price_tick=price_tick,
size_tick=size_tick,
bs_mktid=str(init['bs_mktid']),
_atype=symbol_info['asset_type']
)
case {
'mkt_info': MktPair(
dst=Asset(),
) as mkt,
'fqsn': bs_fqme,
}:
warn_msg += (
f'{mod.name} in API compat transition?\n'
"It's half dict, half man..\n"
'-------------------------------------\n'
)
case FeedInit(
# bs_mktid=bs_mktid,
mkt_info=MktPair(dst=Asset()) as mkt,
shm_write_opts=dict(),
) as init:
log.info(
f'NICE JOB {mod.name} BACKEND!\n'
'You are fully up to API spec B):\n'
f'{init.to_dict()}'
)
case _:
raise FeedInitializationError(init)
# build a msg if we received a dict for input.
if not rx_msg:
init = FeedInit(
bs_mktid=mkt.bs_mktid,
mkt_info=mkt,
shm_write_opts=init.get('shm_write_opts'),
)
# `MktPair` value audits
mkt = init.mkt_info
assert bs_fqme in mkt.fqme
assert mkt.type_key
# `MktPair` wish list
if not isinstance(mkt.src, Asset):
warn_msg += (
f'ALSO, {mod.name.upper()} should try to deliver\n'
'the new `MktPair.src: Asset` field!\n'
'-----------------------------------------------\n'
)
# complain about any non-idealities
if warn_msg:
# TODO: would be nice to register an API_COMPAT or something in
# maybe cyan for this in general throughput piker no?
logmeth = getattr(log, api_log_msg_level)
logmeth(warn_msg)
return init.copy()