Make the data feed layer "fqsn" aware

In order to support instruments with lifetimes (aka derivatives) we need
generally need special symbol annotations which detail such meta data
(such as `MNQ.GLOBEX.20220717` for daq futes). Further there is really
no reason for the public api for this feed layer to care about getting
a special "brokername" field since generally the data is coming directly
from UIs (eg. search selection) so we might as well accept a fqsn (fully
qualified symbol name) which includes the broker name; for now a suffix
like `'.ib'`. We may change this schema (soon) but this at least gets us
to a point where we expect the full name including broker/provider.

An additional detail: for certain "generic" symbol names (like for
futes) we will pull a so called "front contract" and map this to
a specific fqsn underneath, so there is a double (cached) entry for that
entry such that other consumers can use it the same way if desired.

Some other machinery changes:
- expect the `stream_quotes()` endpoint to deliver it's `.started()` msg
  almost immediately since we now need it deliver any fqsn asap (yes
  this means the ep should no longer wait on a "live" first quote and
  instead deliver what quote data it can right away.
- expect the quotes ohlc sampler task to add in the broker name before
  broadcast to remote (actor) consumers since the backend isn't (yet)
  expected to do that add in itself.
- obviously we start using all the new fqsn related `Symbol` apis
mkts_backup
Tyler Goodlet 2022-03-18 14:47:28 -04:00
parent e9ed070cbf
commit a8cb6c2056
1 changed files with 111 additions and 71 deletions

View File

@ -55,7 +55,7 @@ from .ingest import get_ingestormod
from ._source import (
base_iohlc_dtype,
Symbol,
mk_fqsn,
uncons_fqsn,
)
from ..ui import _search
from ._sampling import (
@ -195,7 +195,7 @@ async def _setup_persistent_brokerd(
async def manage_history(
mod: ModuleType,
bus: _FeedsBus,
symbol: str,
fqsn: str,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
@ -209,8 +209,6 @@ async def manage_history(
buffer.
'''
fqsn = mk_fqsn(mod.name, symbol)
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
@ -232,10 +230,13 @@ async def manage_history(
async with marketstore.open_storage_client(
fqsn,
) as (storage, tsdb_arrays):
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
if tsdb_arrays:
log.info(f'Loaded tsdb history {tsdb_arrays}')
@ -264,7 +265,7 @@ async def manage_history(
)
# start history anal and load missing new data via backend.
async with mod.open_history_client(symbol) as hist:
async with mod.open_history_client(fqsn) as hist:
# get latest query's worth of history
array, next_dt = await hist(end_dt='')
@ -280,7 +281,7 @@ async def manage_history(
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
_ = await bus.nursery.start(mod.backfill_bars, symbol, shm)
_ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
# yield back after client connect with filled shm
task_status.started(shm)
@ -341,8 +342,6 @@ async def allocate_persistent_feed(
except ImportError:
mod = get_ingestormod(brokername)
fqsn = mk_fqsn(brokername, symbol)
# mem chan handed to broker backend so it can push real-time
# quotes to this task for sampling and history storage (see below).
send, quote_stream = trio.open_memory_channel(10)
@ -351,28 +350,9 @@ async def allocate_persistent_feed(
some_data_ready = trio.Event()
feed_is_live = trio.Event()
# run 2 tasks:
# - a history loader / maintainer
# - a real-time streamer which consumers and sends new data to any
# consumers as well as writes to storage backends (as configured).
# XXX: neither of these will raise but will cause an inf hang due to:
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
shm = await bus.nursery.start(
manage_history,
mod,
bus,
symbol,
some_data_ready,
feed_is_live,
)
# establish broker backend quote stream by calling
# ``stream_quotes()``, which is a required broker backend endpoint.
init_msg, first_quotes = await bus.nursery.start(
init_msg, first_quote = await bus.nursery.start(
partial(
mod.stream_quotes,
send_chan=send,
@ -381,15 +361,38 @@ async def allocate_persistent_feed(
loglevel=loglevel,
)
)
# the broker-specific fully qualified symbol name
bfqsn = init_msg[symbol]['fqsn']
# HISTORY, run 2 tasks:
# - a history loader / maintainer
# - a real-time streamer which consumers and sends new data to any
# consumers as well as writes to storage backends (as configured).
# XXX: neither of these will raise but will cause an inf hang due to:
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
shm = await bus.nursery.start(
manage_history,
mod,
bus,
bfqsn,
some_data_ready,
feed_is_live,
)
# we hand an IPC-msg compatible shm token to the caller so it
# can read directly from the memory which will be written by
# this task.
init_msg[symbol]['shm_token'] = shm.token
# symbol = Symbol.from_broker_info(
# fqsn,
# init_msg[symbol]['symbol_info']
# )
msg = init_msg[symbol]
msg['shm_token'] = shm.token
# true fqsn
fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix
init_msg[fqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
@ -402,8 +405,22 @@ async def allocate_persistent_feed(
log.info(f'waiting on history to load: {fqsn}')
await some_data_ready.wait()
bus.feeds[symbol.lower()] = (init_msg, first_quotes)
task_status.started((init_msg, first_quotes))
# append ``.<broker>`` suffix to each quote symbol
bsym = symbol + f'.{brokername}'
generic_first_quotes = {
bsym: first_quote,
fqsn: first_quote,
}
bus.feeds[symbol] = bus.feeds[fqsn] = (
init_msg,
generic_first_quotes,
)
# for ambiguous names we simply apply the retreived
# feed to that name (for now).
# task_status.started((init_msg, generic_first_quotes))
task_status.started()
if not start_stream:
await trio.sleep_forever()
@ -421,10 +438,11 @@ async def allocate_persistent_feed(
bus,
shm,
quote_stream,
brokername,
sum_tick_vlm
)
finally:
log.warning(f'{symbol}@{brokername} feed task terminated')
log.warning(f'{fqsn} feed task terminated')
@tractor.context
@ -457,19 +475,16 @@ async def open_feed_bus(
assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername)
bus._subscribers.setdefault(symbol, [])
fqsn = mk_fqsn(brokername, symbol)
entry = bus.feeds.get(symbol)
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# service nursery
entry = bus.feeds.get(symbol)
if entry is None:
# allocate a new actor-local stream bus which will persist for
# this `brokerd`.
# allocate a new actor-local stream bus which
# will persist for this `brokerd`.
async with bus.task_lock:
init_msg, first_quotes = await bus.nursery.start(
await bus.nursery.start(
partial(
allocate_persistent_feed,
@ -490,9 +505,30 @@ async def open_feed_bus(
# subscriber
init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol]
bfqsn = msg['fqsn']
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes
assert bus.feeds[fqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}'
assert bsym in first_quotes
# we use the broker-specific fqsn (bfqsn) for
# the sampler subscription since the backend isn't (yet)
# expected to append it's own name to the fqsn, so we filter
# on keys which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bfqsn, [])
# send this even to subscribers to existing feed?
# deliver initial info message a first quote asap
await ctx.started((init_msg, first_quotes))
await ctx.started((
init_msg,
first_quotes,
))
if not start_stream:
log.warning(f'Not opening real-time stream for {fqsn}')
@ -505,14 +541,12 @@ async def open_feed_bus(
# re-send to trigger display loop cycle (necessary especially
# when the mkt is closed and no real-time messages are
# expected).
await stream.send(first_quotes)
await stream.send({fqsn: first_quotes})
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
if tick_throttle:
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10)
cs = await bus.start_task(
uniform_rate_send,
@ -525,12 +559,15 @@ async def open_feed_bus(
else:
sub = (stream, tick_throttle)
subs = bus._subscribers[symbol]
subs = bus._subscribers[bfqsn]
subs.append(sub)
try:
uid = ctx.chan.uid
# ctrl protocol for start/stop of quote streams based on UI
# state (eg. don't need a stream when a symbol isn't being
# displayed).
async for msg in stream:
if msg == 'pause':
@ -555,7 +592,7 @@ async def open_feed_bus(
# n.cancel_scope.cancel()
cs.cancel()
try:
bus._subscribers[symbol].remove(sub)
bus._subscribers[bfqsn].remove(sub)
except ValueError:
log.warning(f'{sub} for {symbol} was already removed?')
@ -681,10 +718,10 @@ async def install_brokerd_search(
@asynccontextmanager
async def open_feed(
brokername: str,
symbols: list[str],
loglevel: Optional[str] = None,
fqsns: list[str],
loglevel: Optional[str] = None,
backpressure: bool = True,
start_stream: bool = True,
tick_throttle: Optional[float] = None, # Hz
@ -694,7 +731,10 @@ async def open_feed(
Open a "data feed" which provides streamed real-time quotes.
'''
sym = symbols[0].lower()
fqsn = fqsns[0].lower()
brokername, key, suffix = uncons_fqsn(fqsn)
bfqsn = fqsn.replace('.' + brokername, '')
try:
mod = get_brokermod(brokername)
@ -715,7 +755,7 @@ async def open_feed(
portal.open_context(
open_feed_bus,
brokername=brokername,
symbol=sym,
symbol=bfqsn,
loglevel=loglevel,
start_stream=start_stream,
tick_throttle=tick_throttle,
@ -732,9 +772,10 @@ async def open_feed(
):
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
token=init_msg[bfqsn]['shm_token'],
readonly=True,
)
assert fqsn in first_quotes
feed = Feed(
name=brokername,
@ -747,14 +788,15 @@ async def open_feed(
)
for sym, data in init_msg.items():
si = data['symbol_info']
symbol = Symbol.from_broker_info(
brokername,
sym,
si,
fqsn = data['fqsn'] + f'.{brokername}'
symbol = Symbol.from_fqsn(
fqsn,
info=si,
)
# symbol.broker_info[brokername] = si
feed.symbols[fqsn] = symbol
feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this
@ -778,8 +820,7 @@ async def open_feed(
@asynccontextmanager
async def maybe_open_feed(
brokername: str,
symbols: list[str],
fqsns: list[str],
loglevel: Optional[str] = None,
**kwargs,
@ -791,13 +832,12 @@ async def maybe_open_feed(
in a tractor broadcast receiver.
'''
sym = symbols[0].lower()
fqsn = fqsns[0]
async with maybe_open_context(
acm_func=open_feed,
kwargs={
'brokername': brokername,
'symbols': [sym],
'fqsns': fqsns,
'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'),
@ -805,11 +845,11 @@ async def maybe_open_feed(
'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True),
},
key=sym,
key=fqsn,
) as (cache_hit, feed):
if cache_hit:
log.info(f'Using cached feed for {brokername}.{sym}')
log.info(f'Using cached feed for {fqsn}')
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with feed.stream.subscribe() as bstream: