ib: implement `FeedInit` style quote stream setup
As per the new market info packing schema this patch almost gets it completely compatible and useful via implementing the `get_mkt_info()` backend module endpoint B) There's still some questions around `MktPair.src` since all the contract search machinery in the ib api isn't expecting a fiat currency in the symbol key: for ex. `mnq/usd.cme.20230616.ib` has no handling for the `[/]usd` part. For now i'm just excluding the `.src` since it requires extra parsing on quotes-feed requests even though this is also currently breaking forex pairs (idealpro or wtv). I think ideally we do move to a `dst/src.<venue>.<etc..>` style but it's going to require adjustments to all the existing crypto backends.. This also allows dropping the old `mk_init_msgs()` closure.master
parent
147e1baee9
commit
07b7d1d229
|
@ -1164,6 +1164,12 @@ def norm_trade_records(
|
||||||
exch = record['exchange']
|
exch = record['exchange']
|
||||||
lexch = record.get('listingExchange')
|
lexch = record.get('listingExchange')
|
||||||
|
|
||||||
|
# NOTE: remove null values since `tomlkit` can't serialize
|
||||||
|
# them to file.
|
||||||
|
dnc = record.pop('deltaNeutralContract', False)
|
||||||
|
if dnc is not None:
|
||||||
|
record['deltaNeutralContract'] = dnc
|
||||||
|
|
||||||
suffix = lexch or exch
|
suffix = lexch or exch
|
||||||
symbol = record['symbol']
|
symbol = record['symbol']
|
||||||
|
|
||||||
|
|
|
@ -58,13 +58,18 @@ from .api import (
|
||||||
open_client_proxies,
|
open_client_proxies,
|
||||||
get_preferred_data_client,
|
get_preferred_data_client,
|
||||||
Ticker,
|
Ticker,
|
||||||
RequestError,
|
|
||||||
Contract,
|
Contract,
|
||||||
|
RequestError,
|
||||||
)
|
)
|
||||||
from ._util import data_reset_hack
|
from ._util import data_reset_hack
|
||||||
from piker._cacheables import (
|
from piker._cacheables import (
|
||||||
async_lifo_cache,
|
async_lifo_cache,
|
||||||
)
|
)
|
||||||
|
from piker.accounting import (
|
||||||
|
Asset,
|
||||||
|
MktPair,
|
||||||
|
)
|
||||||
|
from piker.data.validate import FeedInit
|
||||||
|
|
||||||
|
|
||||||
# https://interactivebrokers.github.io/tws-api/tick_types.html
|
# https://interactivebrokers.github.io/tws-api/tick_types.html
|
||||||
|
@ -115,7 +120,9 @@ async def open_data_client() -> MethodProxy:
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_history_client(
|
async def open_history_client(
|
||||||
fqsn: str,
|
fqme: str,
|
||||||
|
|
||||||
|
# mkt: MktPair | None = None,
|
||||||
|
|
||||||
) -> tuple[Callable, int]:
|
) -> tuple[Callable, int]:
|
||||||
'''
|
'''
|
||||||
|
@ -134,6 +141,11 @@ async def open_history_client(
|
||||||
|
|
||||||
async with open_data_client() as proxy:
|
async with open_data_client() as proxy:
|
||||||
|
|
||||||
|
# TODO: maybe strip the `MktPair.src: Asset` key here?
|
||||||
|
# see the comment below..
|
||||||
|
# if mkt is not None:
|
||||||
|
# fqme: str = mkt.fqme.remove(mkt.src.name)
|
||||||
|
|
||||||
max_timeout: float = 2.
|
max_timeout: float = 2.
|
||||||
mean: float = 0
|
mean: float = 0
|
||||||
count: int = 0
|
count: int = 0
|
||||||
|
@ -141,10 +153,10 @@ async def open_history_client(
|
||||||
head_dt: None | datetime = None
|
head_dt: None | datetime = None
|
||||||
if (
|
if (
|
||||||
# fx cons seem to not provide this endpoint?
|
# fx cons seem to not provide this endpoint?
|
||||||
'idealpro' not in fqsn
|
'idealpro' not in fqme
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
head_dt = await proxy.get_head_time(fqsn=fqsn)
|
head_dt = await proxy.get_head_time(fqsn=fqme)
|
||||||
except RequestError:
|
except RequestError:
|
||||||
head_dt = None
|
head_dt = None
|
||||||
|
|
||||||
|
@ -159,7 +171,7 @@ async def open_history_client(
|
||||||
query_start = time.time()
|
query_start = time.time()
|
||||||
out, timedout = await get_bars(
|
out, timedout = await get_bars(
|
||||||
proxy,
|
proxy,
|
||||||
fqsn,
|
fqme,
|
||||||
timeframe,
|
timeframe,
|
||||||
end_dt=end_dt,
|
end_dt=end_dt,
|
||||||
)
|
)
|
||||||
|
@ -517,7 +529,9 @@ async def get_bars(
|
||||||
return result, data_cs is not None
|
return result, data_cs is not None
|
||||||
|
|
||||||
|
|
||||||
asset_type_map = {
|
# re-mapping to piker asset type names
|
||||||
|
# https://github.com/erdewit/ib_insync/blob/master/ib_insync/contract.py#L113
|
||||||
|
_asset_type_map = {
|
||||||
'STK': 'stock',
|
'STK': 'stock',
|
||||||
'OPT': 'option',
|
'OPT': 'option',
|
||||||
'FUT': 'future',
|
'FUT': 'future',
|
||||||
|
@ -558,7 +572,7 @@ async def _setup_quote_stream(
|
||||||
'294', # Trade rate / minute
|
'294', # Trade rate / minute
|
||||||
'295', # Vlm rate / minute
|
'295', # Vlm rate / minute
|
||||||
),
|
),
|
||||||
contract: Optional[Contract] = None,
|
contract: Contract | None = None,
|
||||||
|
|
||||||
) -> trio.abc.ReceiveChannel:
|
) -> trio.abc.ReceiveChannel:
|
||||||
'''
|
'''
|
||||||
|
@ -745,19 +759,19 @@ async def get_mkt_info(
|
||||||
|
|
||||||
proxy: MethodProxy | None = None,
|
proxy: MethodProxy | None = None,
|
||||||
|
|
||||||
) -> tuple[MktPair, Pair]:
|
) -> tuple[MktPair, ibis.ContractDetails]:
|
||||||
|
|
||||||
# we don't need to split off any fqme broker part?
|
# XXX: we don't need to split off any fqme broker part?
|
||||||
# bs_fqme, _, broker = fqme.partition('.')
|
# bs_fqme, _, broker = fqme.partition('.')
|
||||||
|
|
||||||
proxy: MethodProxy
|
proxy: MethodProxy
|
||||||
|
get_details: bool = False
|
||||||
if proxy is not None:
|
if proxy is not None:
|
||||||
client_ctx = nullcontext(proxy)
|
client_ctx = nullcontext(proxy)
|
||||||
else:
|
else:
|
||||||
client_ctx = open_data_client
|
client_ctx = open_data_client
|
||||||
|
|
||||||
async with client_ctx as proxy:
|
async with client_ctx as proxy:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
(
|
(
|
||||||
con, # Contract
|
con, # Contract
|
||||||
|
@ -767,27 +781,61 @@ async def get_mkt_info(
|
||||||
log.exception(f'Proxy is ded {proxy._aio_ns}')
|
log.exception(f'Proxy is ded {proxy._aio_ns}')
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# pair: Pair = await client.exch_info(fqme.upper())
|
# TODO: more consistent field translation
|
||||||
|
init_info: dict = {}
|
||||||
|
atype = _asset_type_map[con.secType]
|
||||||
|
|
||||||
# mkt = MktPair(
|
venue = con.primaryExchange or con.exchange
|
||||||
# dst=Asset(
|
price_tick: Decimal = Decimal(str(details.minTick))
|
||||||
# name=pair.baseAsset,
|
|
||||||
# atype='crypto',
|
|
||||||
# tx_tick=digits_to_dec(pair.baseAssetPrecision),
|
|
||||||
# ),
|
|
||||||
# src=Asset(
|
|
||||||
# name=pair.quoteAsset,
|
|
||||||
# atype='crypto',
|
|
||||||
# tx_tick=digits_to_dec(pair.quoteAssetPrecision),
|
|
||||||
# ),
|
|
||||||
# price_tick=pair.price_tick,
|
|
||||||
# size_tick=pair.size_tick,
|
|
||||||
# bs_mktid=pair.symbol,
|
|
||||||
# broker='binance',
|
|
||||||
# )
|
|
||||||
|
|
||||||
# return both
|
if atype == 'stock':
|
||||||
return con, details
|
# XXX: GRRRR they don't support fractional share sizes for
|
||||||
|
# stocks from the API?!
|
||||||
|
# if con.secType == 'STK':
|
||||||
|
size_tick = Decimal('1')
|
||||||
|
else:
|
||||||
|
size_tick: Decimal = Decimal(str(details.minSize).rstrip('0'))
|
||||||
|
# |-> TODO: there is also the Contract.sizeIncrement, bt wtf is it?
|
||||||
|
|
||||||
|
# NOTE: this is duplicate from the .broker.norm_trade_records()
|
||||||
|
# routine, we should factor all this parsing somewhere..
|
||||||
|
expiry_str = str(con.lastTradeDateOrContractMonth)
|
||||||
|
# if expiry:
|
||||||
|
# expiry_str: str = str(pendulum.parse(
|
||||||
|
# str(expiry).strip(' ')
|
||||||
|
# ))
|
||||||
|
|
||||||
|
mkt = MktPair(
|
||||||
|
dst=Asset(
|
||||||
|
name=con.symbol.lower(),
|
||||||
|
atype=atype,
|
||||||
|
tx_tick=size_tick,
|
||||||
|
),
|
||||||
|
|
||||||
|
# TODO: currently we can't pass the fiat src asset because
|
||||||
|
# then we'll get a `MNQUSD` request for history data..
|
||||||
|
# we need to figure out how we're going to handle this (later?)
|
||||||
|
# but likely we want all backends to eventually handle
|
||||||
|
# ``dst/src.venue.`` style?
|
||||||
|
# src=Asset(
|
||||||
|
# name=str(con.currency),
|
||||||
|
# atype='fiat',
|
||||||
|
# tx_tick=Decimal('0.01'), # right?
|
||||||
|
# ),
|
||||||
|
|
||||||
|
price_tick=price_tick,
|
||||||
|
size_tick=size_tick,
|
||||||
|
|
||||||
|
bs_mktid=str(con.conId),
|
||||||
|
venue=str(venue),
|
||||||
|
expiry=expiry_str,
|
||||||
|
broker='ib',
|
||||||
|
|
||||||
|
# TODO: options contract info as str?
|
||||||
|
# contract_info=<optionsdetails>
|
||||||
|
)
|
||||||
|
|
||||||
|
return mkt, details
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
|
@ -812,83 +860,36 @@ async def stream_quotes(
|
||||||
sym = symbols[0]
|
sym = symbols[0]
|
||||||
log.info(f'request for real-time quotes: {sym}')
|
log.info(f'request for real-time quotes: {sym}')
|
||||||
|
|
||||||
|
init_msgs: list[FeedInit] = []
|
||||||
|
|
||||||
proxy: MethodProxy
|
proxy: MethodProxy
|
||||||
|
mkt: MktPair
|
||||||
|
details: ibis.ContractDetails
|
||||||
async with open_data_client() as proxy:
|
async with open_data_client() as proxy:
|
||||||
con, details = await get_mkt_info(sym, proxy=proxy)
|
mkt, details = await get_mkt_info(
|
||||||
|
sym,
|
||||||
|
proxy=proxy, # passed to avoid implicit client load
|
||||||
|
)
|
||||||
|
|
||||||
first_ticker = await proxy.get_quote(contract=con)
|
init_msg = FeedInit(mkt_info=mkt)
|
||||||
first_quote = normalize(first_ticker)
|
|
||||||
|
has_vlm: bool = True
|
||||||
|
if mkt.dst.atype in {
|
||||||
|
'forex',
|
||||||
|
'index',
|
||||||
|
'commodity',
|
||||||
|
}:
|
||||||
|
has_vlm = False
|
||||||
|
# tell sampler config that it shouldn't do vlm summing.
|
||||||
|
init_msg.shm_write_opts['sum_tick_vlm'] = False
|
||||||
|
|
||||||
|
init_msgs.append(init_msg)
|
||||||
|
|
||||||
|
con: Contract = details.contract
|
||||||
|
first_ticker: Ticker = await proxy.get_quote(contract=con)
|
||||||
|
first_quote: dict = normalize(first_ticker)
|
||||||
log.runtime(f'FIRST QUOTE: {first_quote}')
|
log.runtime(f'FIRST QUOTE: {first_quote}')
|
||||||
|
|
||||||
def mk_init_msgs() -> dict[str, dict]:
|
|
||||||
'''
|
|
||||||
Collect a bunch of meta-data useful for feed startup and
|
|
||||||
pack in a `dict`-msg.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# pass back some symbol info like min_tick, trading_hours, etc.
|
|
||||||
con: Contract = details.contract
|
|
||||||
syminfo = asdict(details)
|
|
||||||
syminfo.update(syminfo['contract'])
|
|
||||||
|
|
||||||
# nested dataclass we probably don't need and that won't IPC
|
|
||||||
# serialize
|
|
||||||
syminfo.pop('secIdList')
|
|
||||||
|
|
||||||
# TODO: more consistent field translation
|
|
||||||
atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']]
|
|
||||||
|
|
||||||
if atype in {
|
|
||||||
'forex',
|
|
||||||
'index',
|
|
||||||
'commodity',
|
|
||||||
}:
|
|
||||||
syminfo['no_vlm'] = True
|
|
||||||
|
|
||||||
# XXX: pretty sure we don't need this any more right?
|
|
||||||
# for stocks it seems TWS reports too small a tick size
|
|
||||||
# such that you can't submit orders with that granularity?
|
|
||||||
# min_price_tick = Decimal('0.01') if atype == 'stock' else 0
|
|
||||||
# price_tick = max(price_tick, min_tick)
|
|
||||||
|
|
||||||
price_tick: Decimal = Decimal(str(syminfo['minTick']))
|
|
||||||
size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0'))
|
|
||||||
|
|
||||||
# XXX: GRRRR they don't support fractional share sizes for
|
|
||||||
# stocks from the API?!
|
|
||||||
if con.secType == 'STK':
|
|
||||||
size_tick = Decimal('1')
|
|
||||||
|
|
||||||
syminfo['price_tick_size'] = price_tick
|
|
||||||
# NOTE: as you'd expect for "legacy" assets, the "volume
|
|
||||||
# precision" is normally discreet.
|
|
||||||
syminfo['lot_tick_size'] = size_tick
|
|
||||||
|
|
||||||
# should be at top level right?
|
|
||||||
syminfo['bs_mktid'] = con.conId
|
|
||||||
|
|
||||||
# ibclient = proxy._aio_ns.ib.client
|
|
||||||
# host, port = ibclient.host, ibclient.port
|
|
||||||
fqsn = first_quote['fqsn']
|
|
||||||
|
|
||||||
# TODO: for loop through all symbols passed in
|
|
||||||
init_msgs: dict[str, dict] = {
|
|
||||||
# pass back token, and bool, signalling if we're the writer
|
|
||||||
# and that history has been written
|
|
||||||
sym: {
|
|
||||||
'symbol_info': syminfo,
|
|
||||||
'fqsn': fqsn,
|
|
||||||
'bs_mktid': con.conId,
|
|
||||||
},
|
|
||||||
# 'status': {
|
|
||||||
# 'data_ep': f'{host}:{port}',
|
|
||||||
# },
|
|
||||||
|
|
||||||
}
|
|
||||||
return init_msgs, syminfo
|
|
||||||
|
|
||||||
init_msgs, syminfo = mk_init_msgs()
|
|
||||||
|
|
||||||
# TODO: we should instead spawn a task that waits on a feed to start
|
# TODO: we should instead spawn a task that waits on a feed to start
|
||||||
# and let it wait indefinitely..instead of this hard coded stuff.
|
# and let it wait indefinitely..instead of this hard coded stuff.
|
||||||
with trio.move_on_after(1):
|
with trio.move_on_after(1):
|
||||||
|
@ -954,13 +955,14 @@ async def stream_quotes(
|
||||||
nurse.start_soon(reset_on_feed)
|
nurse.start_soon(reset_on_feed)
|
||||||
|
|
||||||
async with aclosing(stream):
|
async with aclosing(stream):
|
||||||
if syminfo.get('no_vlm', False):
|
# if syminfo.get('no_vlm', False):
|
||||||
|
if not has_vlm:
|
||||||
|
|
||||||
# generally speaking these feeds don't
|
# generally speaking these feeds don't
|
||||||
# include vlm data.
|
# include vlm data.
|
||||||
atype = syminfo['asset_type']
|
atype = mkt.dst.atype
|
||||||
log.info(
|
log.info(
|
||||||
f'No-vlm {sym}@{atype}, skipping quote poll'
|
f'No-vlm {mkt.fqme}@{atype}, skipping quote poll'
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in New Issue