First stage port of `.data.feed` to `MktPair`

Add `MktPair` handling block for when a backend delivers
a `mkt_info`-field containing init msg. Adjust the original
`Symbol`-style `'symbol_info'` msg processing to do `Decimal` defaults
and convert to `MktPair` including slapping in a hacky `_atype: str`
field XD

General initial name changes to `bs_mktid` and `_fqme` throughout!
pre_overruns_ctxcancelled
Tyler Goodlet 2023-03-21 14:09:57 -04:00
parent da7371988a
commit 8b0aead72e
1 changed files with 74 additions and 37 deletions

View File

@ -26,6 +26,7 @@ from collections import (
Counter,
)
from contextlib import asynccontextmanager as acm
from decimal import Decimal
from datetime import datetime
from functools import partial
import time
@ -71,8 +72,10 @@ from ._sharedmem import (
from .ingest import get_ingestormod
from .types import Struct
from ..accounting._mktinfo import (
Asset,
MktPair,
unpack_fqme,
Symbol,
unpack_fqsn,
)
from ._source import base_iohlc_dtype
from ..ui import _search
@ -565,7 +568,7 @@ async def tsdb_backfill(
timeframe=timeframe,
)
broker, symbol, expiry = unpack_fqsn(fqsn)
broker, symbol, expiry = unpack_fqme(fqsn)
try:
(
latest_start_dt,
@ -1009,17 +1012,44 @@ async def allocate_persistent_feed(
# the broker-specific fully qualified symbol name,
# but ensure it is lower-cased for external use.
bfqsn = msg['fqsn'].lower()
bs_mktid = msg['fqsn'].lower()
# true fqsn including broker/provider suffix
fqsn = '.'.join((bfqsn, brokername))
# msg['fqsn'] = bfqsn
# true fqme including broker/provider suffix
fqme = '.'.join((bs_mktid, brokername))
symbol = Symbol.from_fqsn(
fqsn=fqsn,
info=msg['symbol_info'],
)
assert symbol.type_key
mktinfo = msg.get('mkt_info')
if not mktinfo:
mktinfo = msg['symbol_info']
# TODO: read out renamed/new tick size fields in block below!
price_tick = mktinfo.get(
'price_tick_size',
Decimal('0.01'),
)
size_tick = mktinfo.get(
'lot_tick_size',
Decimal('0.0'),
)
log.warning(f'FQME: {fqme} -> backend needs port to `MktPair`')
mkt = MktPair.from_fqme(
fqme,
price_tick=price_tick,
size_tick=size_tick,
bs_mktid=bs_mktid,
_atype=mktinfo['asset_type']
)
else:
# the new msg-protocol is to expect an already packed
# ``Asset`` and ``MktPair`` object from the backend
mkt = mktinfo
assert isinstance(mkt, MktPair)
assert isinstance(mkt.dst, Asset)
assert mkt.type_key
# HISTORY storage, run 2 tasks:
# - a history loader / maintainer
@ -1040,17 +1070,24 @@ async def allocate_persistent_feed(
manage_history,
mod,
bus,
fqsn,
fqme,
some_data_ready,
feed_is_live,
)
# yield back control to starting nursery once we receive either
# some history or a real-time quote.
log.info(f'waiting on history to load: {fqsn}')
log.info(f'waiting on history to load: {fqme}')
await some_data_ready.wait()
symbol = Symbol.from_fqsn(
fqsn=fqme,
info=msg['symbol_info'],
)
flume = Flume(
# TODO: we have to use this for now since currently the
# MktPair above doesn't render the correct output key it seems
# when we provide the `MktInfo` here?..?
symbol=symbol,
first_quote=first_quote,
_rt_shm_token=rt_shm.token,
@ -1061,7 +1098,7 @@ async def allocate_persistent_feed(
# for ambiguous names we simply apply the retreived
# feed to that name (for now).
bus.feeds[symstr] = bus.feeds[bfqsn] = flume
bus.feeds[symstr] = bus.feeds[bs_mktid] = flume
task_status.started()
@ -1104,7 +1141,7 @@ async def allocate_persistent_feed(
# start sample loop and shm incrementer task for OHLC style sampling
# at the above registered step periods.
try:
log.info(f'Starting sampler task for {fqsn}')
log.info(f'Starting sampler task for {fqme}')
await sample_and_broadcast(
bus,
rt_shm,
@ -1114,7 +1151,7 @@ async def allocate_persistent_feed(
sum_tick_vlm
)
finally:
log.warning(f'{fqsn} feed task terminated')
log.warning(f'{fqme} feed task terminated')
@tractor.context
@ -1197,22 +1234,22 @@ async def open_feed_bus(
# subscriber
flume = bus.feeds[symbol]
sym = flume.symbol
bfqsn = sym.key
bs_mktid = sym.key
fqsn = sym.fqsn # true fqsn
assert bfqsn in fqsn and brokername in fqsn
assert bs_mktid in fqsn and brokername in fqsn
if sym.suffix:
bfqsn = fqsn.removesuffix(f'.{brokername}')
log.warning(f'{brokername} expanded symbol {symbol} -> {bfqsn}')
bs_mktid = fqsn.removesuffix(f'.{brokername}')
log.warning(f'{brokername} expanded symbol {symbol} -> {bs_mktid}')
# pack for ``.started()`` sync msg
flumes[fqsn] = flume
# we use the broker-specific fqsn (bfqsn) for
# the sampler subscription since the backend isn't (yet)
# expected to append it's own name to the fqsn, so we filter
# on keys which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bfqsn, set())
# we use the broker-specific market id (bs_mktid) for the
# sampler subscription since the backend isn't (yet) expected to
# append it's own name to the fqsn, so we filter on keys which
# *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bs_mktid, set())
# sync feed subscribers with flume handles
await ctx.started(
@ -1276,9 +1313,9 @@ async def open_feed_bus(
# maybe use the current task-id to key the sub list that's
# added / removed? Or maybe we can add a general
# pause-resume by sub-key api?
bfqsn = fqsn.removesuffix(f'.{brokername}')
local_subs.setdefault(bfqsn, set()).add(sub)
bus.add_subs(bfqsn, {sub})
bs_mktid = fqsn.removesuffix(f'.{brokername}')
local_subs.setdefault(bs_mktid, set()).add(sub)
bus.add_subs(bs_mktid, {sub})
# sync caller with all subs registered state
sub_registered.set()
@ -1291,16 +1328,16 @@ async def open_feed_bus(
async for msg in stream:
if msg == 'pause':
for bfqsn, subs in local_subs.items():
for bs_mktid, subs in local_subs.items():
log.info(
f'Pausing {bfqsn} feed for {uid}')
bus.remove_subs(bfqsn, subs)
f'Pausing {bs_mktid} feed for {uid}')
bus.remove_subs(bs_mktid, subs)
elif msg == 'resume':
for bfqsn, subs in local_subs.items():
for bs_mktid, subs in local_subs.items():
log.info(
f'Resuming {bfqsn} feed for {uid}')
bus.add_subs(bfqsn, subs)
f'Resuming {bs_mktid} feed for {uid}')
bus.add_subs(bs_mktid, subs)
else:
raise ValueError(msg)
@ -1314,8 +1351,8 @@ async def open_feed_bus(
cs.cancel()
# drop all subs for this task from the bus
for bfqsn, subs in local_subs.items():
bus.remove_subs(bfqsn, subs)
for bs_mktid, subs in local_subs.items():
bus.remove_subs(bs_mktid, subs)
class Feed(Struct):
@ -1512,7 +1549,7 @@ async def open_feed(
feed = Feed()
for fqsn in fqsns:
brokername, key, suffix = unpack_fqsn(fqsn)
brokername, key, suffix = unpack_fqme(fqsn)
bfqsn = fqsn.replace('.' + brokername, '')
try: