First stage port of `.data.feed` to `MktPair`
Add `MktPair` handling block for when a backend delivers a `mkt_info`-field containing init msg. Adjust the original `Symbol`-style `'symbol_info'` msg processing to do `Decimal` defaults and convert to `MktPair` including slapping in a hacky `_atype: str` field XD General initial name changes to `bs_mktid` and `_fqme` throughout!rekt_pps
parent
7eb0b1d249
commit
2cc80d53ca
|
@ -26,6 +26,7 @@ from collections import (
|
|||
Counter,
|
||||
)
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from decimal import Decimal
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
import time
|
||||
|
@ -71,8 +72,10 @@ from ._sharedmem import (
|
|||
from .ingest import get_ingestormod
|
||||
from .types import Struct
|
||||
from ..accounting._mktinfo import (
|
||||
Asset,
|
||||
MktPair,
|
||||
unpack_fqme,
|
||||
Symbol,
|
||||
unpack_fqsn,
|
||||
)
|
||||
from ._source import base_iohlc_dtype
|
||||
from ..ui import _search
|
||||
|
@ -565,7 +568,7 @@ async def tsdb_backfill(
|
|||
timeframe=timeframe,
|
||||
)
|
||||
|
||||
broker, symbol, expiry = unpack_fqsn(fqsn)
|
||||
broker, symbol, expiry = unpack_fqme(fqsn)
|
||||
try:
|
||||
(
|
||||
latest_start_dt,
|
||||
|
@ -1009,17 +1012,44 @@ async def allocate_persistent_feed(
|
|||
|
||||
# the broker-specific fully qualified symbol name,
|
||||
# but ensure it is lower-cased for external use.
|
||||
bfqsn = msg['fqsn'].lower()
|
||||
bs_mktid = msg['fqsn'].lower()
|
||||
|
||||
# true fqsn including broker/provider suffix
|
||||
fqsn = '.'.join((bfqsn, brokername))
|
||||
# msg['fqsn'] = bfqsn
|
||||
# true fqme including broker/provider suffix
|
||||
fqme = '.'.join((bs_mktid, brokername))
|
||||
|
||||
symbol = Symbol.from_fqsn(
|
||||
fqsn=fqsn,
|
||||
info=msg['symbol_info'],
|
||||
mktinfo = msg.get('mkt_info')
|
||||
if not mktinfo:
|
||||
|
||||
mktinfo = msg['symbol_info']
|
||||
|
||||
# TODO: read out renamed/new tick size fields in block below!
|
||||
price_tick = mktinfo.get(
|
||||
'price_tick_size',
|
||||
Decimal('0.01'),
|
||||
)
|
||||
assert symbol.type_key
|
||||
size_tick = mktinfo.get(
|
||||
'lot_tick_size',
|
||||
Decimal('0.0'),
|
||||
)
|
||||
|
||||
log.warning(f'FQME: {fqme} -> backend needs port to `MktPair`')
|
||||
mkt = MktPair.from_fqme(
|
||||
fqme,
|
||||
price_tick=price_tick,
|
||||
size_tick=size_tick,
|
||||
bs_mktid=bs_mktid,
|
||||
|
||||
_atype=mktinfo['asset_type']
|
||||
)
|
||||
|
||||
else:
|
||||
# the new msg-protocol is to expect an already packed
|
||||
# ``Asset`` and ``MktPair`` object from the backend
|
||||
mkt = mktinfo
|
||||
assert isinstance(mkt, MktPair)
|
||||
assert isinstance(mkt.dst, Asset)
|
||||
|
||||
assert mkt.type_key
|
||||
|
||||
# HISTORY storage, run 2 tasks:
|
||||
# - a history loader / maintainer
|
||||
|
@ -1040,17 +1070,24 @@ async def allocate_persistent_feed(
|
|||
manage_history,
|
||||
mod,
|
||||
bus,
|
||||
fqsn,
|
||||
fqme,
|
||||
some_data_ready,
|
||||
feed_is_live,
|
||||
)
|
||||
|
||||
# yield back control to starting nursery once we receive either
|
||||
# some history or a real-time quote.
|
||||
log.info(f'waiting on history to load: {fqsn}')
|
||||
log.info(f'waiting on history to load: {fqme}')
|
||||
await some_data_ready.wait()
|
||||
|
||||
symbol = Symbol.from_fqsn(
|
||||
fqsn=fqme,
|
||||
info=msg['symbol_info'],
|
||||
)
|
||||
flume = Flume(
|
||||
# TODO: we have to use this for now since currently the
|
||||
# MktPair above doesn't render the correct output key it seems
|
||||
# when we provide the `MktInfo` here?..?
|
||||
symbol=symbol,
|
||||
first_quote=first_quote,
|
||||
_rt_shm_token=rt_shm.token,
|
||||
|
@ -1061,7 +1098,7 @@ async def allocate_persistent_feed(
|
|||
|
||||
# for ambiguous names we simply apply the retreived
|
||||
# feed to that name (for now).
|
||||
bus.feeds[symstr] = bus.feeds[bfqsn] = flume
|
||||
bus.feeds[symstr] = bus.feeds[bs_mktid] = flume
|
||||
|
||||
task_status.started()
|
||||
|
||||
|
@ -1104,7 +1141,7 @@ async def allocate_persistent_feed(
|
|||
# start sample loop and shm incrementer task for OHLC style sampling
|
||||
# at the above registered step periods.
|
||||
try:
|
||||
log.info(f'Starting sampler task for {fqsn}')
|
||||
log.info(f'Starting sampler task for {fqme}')
|
||||
await sample_and_broadcast(
|
||||
bus,
|
||||
rt_shm,
|
||||
|
@ -1114,7 +1151,7 @@ async def allocate_persistent_feed(
|
|||
sum_tick_vlm
|
||||
)
|
||||
finally:
|
||||
log.warning(f'{fqsn} feed task terminated')
|
||||
log.warning(f'{fqme} feed task terminated')
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -1197,22 +1234,22 @@ async def open_feed_bus(
|
|||
# subscriber
|
||||
flume = bus.feeds[symbol]
|
||||
sym = flume.symbol
|
||||
bfqsn = sym.key
|
||||
bs_mktid = sym.key
|
||||
fqsn = sym.fqsn # true fqsn
|
||||
assert bfqsn in fqsn and brokername in fqsn
|
||||
assert bs_mktid in fqsn and brokername in fqsn
|
||||
|
||||
if sym.suffix:
|
||||
bfqsn = fqsn.removesuffix(f'.{brokername}')
|
||||
log.warning(f'{brokername} expanded symbol {symbol} -> {bfqsn}')
|
||||
bs_mktid = fqsn.removesuffix(f'.{brokername}')
|
||||
log.warning(f'{brokername} expanded symbol {symbol} -> {bs_mktid}')
|
||||
|
||||
# pack for ``.started()`` sync msg
|
||||
flumes[fqsn] = flume
|
||||
|
||||
# we use the broker-specific fqsn (bfqsn) for
|
||||
# the sampler subscription since the backend isn't (yet)
|
||||
# expected to append it's own name to the fqsn, so we filter
|
||||
# on keys which *do not* include that name (e.g .ib) .
|
||||
bus._subscribers.setdefault(bfqsn, set())
|
||||
# we use the broker-specific market id (bs_mktid) for the
|
||||
# sampler subscription since the backend isn't (yet) expected to
|
||||
# append it's own name to the fqsn, so we filter on keys which
|
||||
# *do not* include that name (e.g .ib) .
|
||||
bus._subscribers.setdefault(bs_mktid, set())
|
||||
|
||||
# sync feed subscribers with flume handles
|
||||
await ctx.started(
|
||||
|
@ -1276,9 +1313,9 @@ async def open_feed_bus(
|
|||
# maybe use the current task-id to key the sub list that's
|
||||
# added / removed? Or maybe we can add a general
|
||||
# pause-resume by sub-key api?
|
||||
bfqsn = fqsn.removesuffix(f'.{brokername}')
|
||||
local_subs.setdefault(bfqsn, set()).add(sub)
|
||||
bus.add_subs(bfqsn, {sub})
|
||||
bs_mktid = fqsn.removesuffix(f'.{brokername}')
|
||||
local_subs.setdefault(bs_mktid, set()).add(sub)
|
||||
bus.add_subs(bs_mktid, {sub})
|
||||
|
||||
# sync caller with all subs registered state
|
||||
sub_registered.set()
|
||||
|
@ -1291,16 +1328,16 @@ async def open_feed_bus(
|
|||
async for msg in stream:
|
||||
|
||||
if msg == 'pause':
|
||||
for bfqsn, subs in local_subs.items():
|
||||
for bs_mktid, subs in local_subs.items():
|
||||
log.info(
|
||||
f'Pausing {bfqsn} feed for {uid}')
|
||||
bus.remove_subs(bfqsn, subs)
|
||||
f'Pausing {bs_mktid} feed for {uid}')
|
||||
bus.remove_subs(bs_mktid, subs)
|
||||
|
||||
elif msg == 'resume':
|
||||
for bfqsn, subs in local_subs.items():
|
||||
for bs_mktid, subs in local_subs.items():
|
||||
log.info(
|
||||
f'Resuming {bfqsn} feed for {uid}')
|
||||
bus.add_subs(bfqsn, subs)
|
||||
f'Resuming {bs_mktid} feed for {uid}')
|
||||
bus.add_subs(bs_mktid, subs)
|
||||
|
||||
else:
|
||||
raise ValueError(msg)
|
||||
|
@ -1314,8 +1351,8 @@ async def open_feed_bus(
|
|||
cs.cancel()
|
||||
|
||||
# drop all subs for this task from the bus
|
||||
for bfqsn, subs in local_subs.items():
|
||||
bus.remove_subs(bfqsn, subs)
|
||||
for bs_mktid, subs in local_subs.items():
|
||||
bus.remove_subs(bs_mktid, subs)
|
||||
|
||||
|
||||
class Feed(Struct):
|
||||
|
@ -1512,7 +1549,7 @@ async def open_feed(
|
|||
feed = Feed()
|
||||
|
||||
for fqsn in fqsns:
|
||||
brokername, key, suffix = unpack_fqsn(fqsn)
|
||||
brokername, key, suffix = unpack_fqme(fqsn)
|
||||
bfqsn = fqsn.replace('.' + brokername, '')
|
||||
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue