Support "expiry" suffixes for derivatives with ib

To start we only have futes working but this allows both searching
and loading multiple expiries of the same instrument by specifying
different expiries with a `.<expiry>` suffix in the symbol key (eg.
`mnq.globex.20220617`). This also paves the way for options contracts
which will need something similar plus a strike property. This change
set also required a patch to `ib_insync` to allow retrieving multiple
"ambiguous" contracts from the `IB.reqContractDetailsAcync()` method,
see https://github.com/erdewit/ib_insync/pull/454 for further discussion
since the approach here might change.

This patch also includes a lot of serious reworking of some `trio`-`asyncio`
integration to use the newer `tractor.to_asyncio.open_channel_from()`
api and use it (with a relay task) to open a persistent connection with
an in-actor `ib_insync` `Client` mostly for history requests.

Deats,
- annot the module with a `_infect_asyncio: bool` for `tractor` spawning
- add a futes venu list
- support ambiguous futes contracts lookups so that all expiries will
  show in search
- support both continuous and specific expiry fute contract
  qualification
- allow searching with "fqsn" keys
- don't crash on "data not found" errors in history requests
- move all quotes msg "topic-key" generation (which should now be
  a broker-specific fqsn) and per-contract quote processing into
  `normalize()`
- set the fqsn key in the symbol info init msg
- use `open_client_proxy()` in bars backfiller endpoint
- include expiry suffix in position update keys
mkts_backup
Tyler Goodlet 2022-03-18 09:25:39 -04:00
parent 990417b172
commit fa8e4f7c27
1 changed files with 261 additions and 142 deletions

View File

@ -60,7 +60,6 @@ import numpy as np
from .. import config from .. import config
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
# from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df from ..data._source import from_df
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ._util import SymbolNotFound, NoData from ._util import SymbolNotFound, NoData
@ -116,9 +115,14 @@ _search_conf = {
} }
# annotation to let backend agnostic code
# know if ``brokerd`` should be spawned with
# ``tractor``'s aio mode.
_infect_asyncio: bool = True
# overrides to sidestep pretty questionable design decisions in # overrides to sidestep pretty questionable design decisions in
# ``ib_insync``: # ``ib_insync``:
class NonShittyWrapper(Wrapper): class NonShittyWrapper(Wrapper):
def tcpDataArrived(self): def tcpDataArrived(self):
"""Override time stamps to be floats for now. """Override time stamps to be floats for now.
@ -173,6 +177,13 @@ _adhoc_cmdty_data_map = {
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
} }
_futes_venues = (
'GLOBEX',
'NYMEX',
'CME',
'CMECRYPTO',
)
_adhoc_futes_set = { _adhoc_futes_set = {
# equities # equities
@ -243,7 +254,7 @@ class Client:
async def bars( async def bars(
self, self,
symbol: str, fqsn: str,
# EST in ISO 8601 format is required... below is EPOCH # EST in ISO 8601 format is required... below is EPOCH
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
end_dt: Union[datetime, str] = "", end_dt: Union[datetime, str] = "",
@ -254,7 +265,7 @@ class Client:
is_paid_feed: bool = False, # placeholder is_paid_feed: bool = False, # placeholder
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
''' '''
Retreive OHLCV bars for a symbol over a range to the present. Retreive OHLCV bars for a fqsn over a range to the present.
''' '''
bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs = {'whatToShow': 'TRADES'}
@ -263,7 +274,7 @@ class Client:
print(f'ENTER BARS {_enters} @ end={end_dt}') print(f'ENTER BARS {_enters} @ end={end_dt}')
_enters += 1 _enters += 1
contract = await self.find_contract(symbol) contract = await self.find_contract(fqsn)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count) # _min = min(2000*100, count)
@ -300,7 +311,7 @@ class Client:
) )
if not bars: if not bars:
# TODO: raise underlying error here # TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?") raise ValueError(f"No bars retreived for {fqsn}?")
# TODO: rewrite this faster with ``numba`` # TODO: rewrite this faster with ``numba``
# convert to pandas dataframe: # convert to pandas dataframe:
@ -342,23 +353,24 @@ class Client:
async def search_stocks( async def search_stocks(
self, self,
pattern: str, pattern: str,
get_details: bool = False, get_details: bool = False,
# how many contracts to search "up to" upto: int = 3, # how many contracts to search "up to"
upto: int = 3,
) -> dict[str, ContractDetails]: ) -> dict[str, ContractDetails]:
"""Search for stocks matching provided ``str`` pattern. '''
Search for stocks matching provided ``str`` pattern.
Return a dictionary of ``upto`` entries worth of contract details. Return a dictionary of ``upto`` entries worth of contract details.
"""
'''
descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) descriptions = await self.ib.reqMatchingSymbolsAsync(pattern)
if descriptions is not None: if descriptions is not None:
descrs = descriptions[:upto] descrs = descriptions[:upto]
if get_details: if get_details:
return await self.con_deats([d.contract for d in descrs]) deats = await self.con_deats([d.contract for d in descrs])
return deats
else: else:
results = {} results = {}
@ -368,6 +380,10 @@ class Client:
# from search? # from search?
exch = con.primaryExchange.rsplit('.')[0] exch = con.primaryExchange.rsplit('.')[0]
unique_sym = f'{con.symbol}.{exch}' unique_sym = f'{con.symbol}.{exch}'
expiry = con.lastTradeDateOrContractMonth
if expiry:
unique_sym += f'{expiry}'
results[unique_sym] = {} results[unique_sym] = {}
return results return results
@ -385,26 +401,75 @@ class Client:
# TODO add search though our adhoc-locally defined symbol set # TODO add search though our adhoc-locally defined symbol set
# for futes/cmdtys/ # for futes/cmdtys/
return await self.search_stocks(pattern, upto, get_details=True) results = await self.search_stocks(
pattern,
upto=upto,
get_details=True,
)
async def get_cont_fute( for key, contracts in results.copy().items():
tract = contracts['contract']
sym = tract['symbol']
sectype = tract['secType']
if sectype == 'IND':
results[f'{sym}.IND'] = tract
results.pop(key)
exch = tract['exchange']
if exch in _futes_venues:
# try get all possible contracts for symbol as per,
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
con = Contract(
'FUT+CONTFUT',
symbol=sym,
exchange=exch,
)
possibles = await self.ib.qualifyContractsAsync(con)
for i, condict in enumerate(sorted(
map(asdict, possibles),
# sort by expiry
key=lambda con: con['lastTradeDateOrContractMonth'],
)):
expiry = condict['lastTradeDateOrContractMonth']
results[f'{sym}.{exch}.{expiry}'] = condict
return results
async def get_fute(
self, self,
symbol: str, symbol: str,
exchange: str, exchange: str,
) -> Contract: expiry: str = '',
"""Get an unqualifed contract for the current "continous" future. front: bool = False,
"""
contcon = ibis.ContFuture(symbol, exchange=exchange)
) -> Contract:
'''
Get an unqualifed contract for the current "continous" future.
'''
# it's the "front" contract returned here # it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] if front:
return ibis.Future(conId=frontcon.conId) con = (await self.ib.qualifyContractsAsync(
ibis.ContFuture(symbol, exchange=exchange)
))[0]
else:
con = (await self.ib.qualifyContractsAsync(
ibis.Future(
symbol,
exchange=exchange,
lastTradeDateOrContractMonth=expiry,
)
))[0]
return con
async def find_contract( async def find_contract(
self, self,
symbol, pattern: str,
currency: str = 'USD', currency: str = 'USD',
**kwargs, **kwargs,
) -> Contract: ) -> Contract:
# TODO: we can't use this currently because # TODO: we can't use this currently because
@ -418,11 +483,20 @@ class Client:
# XXX UPDATE: we can probably do the tick/trades scraping # XXX UPDATE: we can probably do the tick/trades scraping
# inside our eventkit handler instead to bypass this entirely? # inside our eventkit handler instead to bypass this entirely?
if 'ib' in pattern:
from ..data._source import uncons_fqsn
broker, symbol, expiry = uncons_fqsn(pattern)
else:
symbol = pattern
# try: # try:
# # give the cache a go # # give the cache a go
# return self._contracts[symbol] # return self._contracts[symbol]
# except KeyError: # except KeyError:
# log.debug(f'Looking up contract for {symbol}') # log.debug(f'Looking up contract for {symbol}')
expiry: str = ''
if symbol.count('.') > 1:
symbol, _, expiry = symbol.rpartition('.')
# use heuristics to figure out contract "type" # use heuristics to figure out contract "type"
try: try:
@ -431,9 +505,27 @@ class Client:
# likely there's an embedded `.` for a forex pair # likely there's an embedded `.` for a forex pair
breakpoint() breakpoint()
qualify: bool = True
# futes # futes
if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): if exch in _futes_venues:
con = await self.get_cont_fute(symbol=sym, exchange=exch) if expiry:
# get the "front" contract
contract = await self.get_fute(
symbol=sym,
exchange=exch,
expiry=expiry,
)
else:
# get the "front" contract
contract = await self.get_fute(
symbol=sym,
exchange=exch,
front=True,
)
qualify = False
elif exch in ('FOREX'): elif exch in ('FOREX'):
currency = '' currency = ''
@ -473,12 +565,15 @@ class Client:
) )
try: try:
exch = 'SMART' if not exch else exch exch = 'SMART' if not exch else exch
if qualify:
contract = (await self.ib.qualifyContractsAsync(con))[0] contract = (await self.ib.qualifyContractsAsync(con))[0]
else:
assert contract
except IndexError: except IndexError:
raise ValueError(f"No contract could be found {con}") raise ValueError(f"No contract could be found {con}")
self._contracts[symbol] = contract self._contracts[pattern] = contract
return contract return contract
async def get_head_time( async def get_head_time(
@ -828,8 +923,8 @@ async def load_aio_clients(
accounts_found: dict[str, Client] = {} accounts_found: dict[str, Client] = {}
if ( if (
client and client.ib.isConnected() or client and client.ib.isConnected()
sockaddr in _scan_ignore or sockaddr in _scan_ignore
): ):
continue continue
@ -1039,8 +1134,12 @@ async def open_aio_client_method_relay(
# relay all method requests to ``asyncio``-side client and # relay all method requests to ``asyncio``-side client and
# deliver back results # deliver back results
while True: while not to_trio._closed:
msg = await from_trio.get() msg = await from_trio.get()
if msg is None:
print('asyncio PROXY-RELAY SHUTDOWN')
break
meth_name, kwargs = msg meth_name, kwargs = msg
meth = getattr(client, meth_name) meth = getattr(client, meth_name)
@ -1071,8 +1170,17 @@ async def open_client_proxy() -> MethodProxy:
yield proxy yield proxy
except RequestError as err: # terminate asyncio side task
code, msg = err.code, err.message await chan.send(None)
except (
RequestError,
BaseException,
)as err:
code = getattr(err, 'code', None)
if code:
msg = err.message
await tractor.breakpoint()
# TODO: retreive underlying ``ib_insync`` error? # TODO: retreive underlying ``ib_insync`` error?
if ( if (
@ -1080,6 +1188,7 @@ async def open_client_proxy() -> MethodProxy:
'HMDS query returned no data' in msg 'HMDS query returned no data' in msg
or 'No market data permissions for' in msg or 'No market data permissions for' in msg
) )
or code == 200
): ):
# these cases should not cause a task crash # these cases should not cause a task crash
log.warning(msg) log.warning(msg)
@ -1088,28 +1197,20 @@ async def open_client_proxy() -> MethodProxy:
raise raise
# @acm @acm
# async def get_client( async def get_client(
# **kwargs, **kwargs,
# ) -> Client: ) -> Client:
# ''' '''
# Init the ``ib_insync`` client in another actor and return Init the ``ib_insync`` client in another actor and return
# a method proxy to it. a method proxy to it.
# ''' '''
# async with (
# maybe_spawn_brokerd(
# brokername='ib',
# infect_asyncio=True,
# **kwargs
# ) as portal,
# ):
# assert 0
# TODO: the IPC via portal relay layer for when this current # TODO: the IPC via portal relay layer for when this current
# actor isn't in aio mode. # actor isn't in aio mode.
# open_client_proxy() as proxy, async with open_client_proxy() as proxy:
# yield proxy yield proxy
# https://interactivebrokers.github.io/tws-api/tick_types.html # https://interactivebrokers.github.io/tws-api/tick_types.html
@ -1137,11 +1238,40 @@ tick_types = {
} }
# TODO: cython/mypyc/numba this!
def normalize( def normalize(
ticker: Ticker, ticker: Ticker,
calc_price: bool = False calc_price: bool = False
) -> dict: ) -> dict:
# should be real volume for this contract by default
calc_price = False
# check for special contract types
con = ticker.contract
if type(con) in (
ibis.Commodity,
ibis.Forex,
):
# commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price
suffix = con.secType
# no real volume on this tract
calc_price = True
else:
suffix = con.primaryExchange
if not suffix:
suffix = con.exchange
# append a `.<suffix>` to the returned symbol
# key for derivatives that normally is the expiry
# date key.
expiry = con.lastTradeDateOrContractMonth
if expiry:
suffix += f'.{expiry}'
# convert named tuples to dicts so we send usable keys # convert named tuples to dicts so we send usable keys
new_ticks = [] new_ticks = []
for tick in ticker.ticks: for tick in ticker.ticks:
@ -1170,6 +1300,12 @@ def normalize(
# serialize for transport # serialize for transport
data = asdict(ticker) data = asdict(ticker)
# generate fqsn with possible specialized suffix
# for derivatives.
data['symbol'] = data['fqsn'] = '.'.join(
(con.symbol, suffix)
).lower()
# convert named tuples to dicts for transport # convert named tuples to dicts for transport
tbts = data.get('tickByTicks') tbts = data.get('tickByTicks')
if tbts: if tbts:
@ -1191,7 +1327,7 @@ def normalize(
async def get_bars( async def get_bars(
proxy: MethodProxy, proxy: MethodProxy,
sym: str, fqsn: str,
end_dt: str = "", end_dt: str = "",
) -> (dict, np.ndarray): ) -> (dict, np.ndarray):
@ -1204,15 +1340,15 @@ async def get_bars(
fails = 0 fails = 0
bars: Optional[list] = None bars: Optional[list] = None
for _ in range(2): for _ in range(3):
try: try:
bars, bars_array = await proxy.bars( bars, bars_array = await proxy.bars(
symbol=sym, fqsn=fqsn,
end_dt=end_dt, end_dt=end_dt,
) )
if bars_array is None: if bars_array is None:
raise SymbolNotFound(sym) raise SymbolNotFound(fqsn)
next_dt = bars[0].date next_dt = bars[0].date
print(f'ib datetime {next_dt}') print(f'ib datetime {next_dt}')
@ -1252,7 +1388,7 @@ async def get_bars(
elif 'No market data permissions for' in err.message: elif 'No market data permissions for' in err.message:
# TODO: signalling for no permissions searches # TODO: signalling for no permissions searches
raise NoData(f'Symbol: {sym}') raise NoData(f'Symbol: {fqsn}')
break break
else: else:
@ -1311,7 +1447,7 @@ async def open_history_client(
async def backfill_bars( async def backfill_bars(
sym: str, fqsn: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
# TODO: we want to avoid overrunning the underlying shm array buffer # TODO: we want to avoid overrunning the underlying shm array buffer
@ -1331,7 +1467,9 @@ async def backfill_bars(
https://github.com/pikers/piker/issues/128 https://github.com/pikers/piker/issues/128
''' '''
# async with open_history_client(sym) as proxy: with trio.CancelScope() as cs:
# async with open_history_client(fqsn) as proxy:
async with open_client_proxy() as proxy: async with open_client_proxy() as proxy:
if platform.system() == 'Windows': if platform.system() == 'Windows':
@ -1339,7 +1477,7 @@ async def backfill_bars(
'Decreasing history query count to 4 since, windows...') 'Decreasing history query count to 4 since, windows...')
count = 4 count = 4
out, fails = await get_bars(proxy, sym) out, fails = await get_bars(proxy, fqsn)
if out is None: if out is None:
raise RuntimeError("Could not pull currrent history?!") raise RuntimeError("Could not pull currrent history?!")
@ -1351,14 +1489,12 @@ async def backfill_bars(
# write historical data to buffer # write historical data to buffer
shm.push(bars_array) shm.push(bars_array)
with trio.CancelScope() as cs:
task_status.started(cs) task_status.started(cs)
i = 0 i = 0
while i < count: while i < count:
out, fails = await get_bars(proxy, sym, end_dt=next_dt) out, fails = await get_bars(proxy, fqsn, end_dt=next_dt)
if fails is None or fails > 1: if fails is None or fails > 1:
break break
@ -1430,8 +1566,10 @@ async def _setup_quote_stream(
contract: Optional[Contract] = None, contract: Optional[Contract] = None,
) -> trio.abc.ReceiveChannel: ) -> trio.abc.ReceiveChannel:
"""Stream a ticker using the std L1 api. '''
""" Stream a ticker using the std L1 api.
'''
global _quote_streams global _quote_streams
to_trio.send_nowait(None) to_trio.send_nowait(None)
@ -1519,7 +1657,10 @@ async def open_aio_quote_stream(
if from_aio: if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer # if we already have a cached feed deliver a rx side clone to consumer
async with broadcast_receiver(from_aio) as from_aio: async with broadcast_receiver(
from_aio,
2**6,
) as from_aio:
yield from_aio yield from_aio
return return
@ -1555,17 +1696,13 @@ async def stream_quotes(
''' '''
# TODO: support multiple subscriptions # TODO: support multiple subscriptions
sym = symbols[0] sym = symbols[0]
log.info(f'request for real-time quotes: {sym}')
contract, first_ticker, details = await _trio_run_client_method( con, first_ticker, details = await _trio_run_client_method(
method='get_sym_details', method='get_sym_details',
symbol=sym, symbol=sym,
) )
first_quote = normalize(first_ticker)
with trio.move_on_after(1):
contract, first_ticker, details = await _trio_run_client_method(
method='get_quote',
symbol=sym,
)
def mk_init_msgs() -> dict[str, dict]: def mk_init_msgs() -> dict[str, dict]:
# pass back some symbol info like min_tick, trading_hours, etc. # pass back some symbol info like min_tick, trading_hours, etc.
@ -1593,37 +1730,18 @@ async def stream_quotes(
# and that history has been written # and that history has been written
sym: { sym: {
'symbol_info': syminfo, 'symbol_info': syminfo,
'fqsn': first_quote['fqsn'],
} }
} }
return init_msgs return init_msgs
init_msgs = mk_init_msgs() init_msgs = mk_init_msgs()
con = first_ticker.contract
# should be real volume for this contract by default with trio.move_on_after(1):
calc_price = False contract, first_ticker, details = await _trio_run_client_method(
method='get_quote',
# check for special contract types symbol=sym,
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): )
suffix = con.primaryExchange
if not suffix:
suffix = con.exchange
else:
# commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price
suffix = con.secType
# no real volume on this tract
calc_price = True
quote = normalize(first_ticker, calc_price=calc_price)
con = quote['contract']
topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic
# pass first quote asap
first_quote = {topic: quote}
# it might be outside regular trading hours so see if we can at # it might be outside regular trading hours so see if we can at
# least grab history. # least grab history.
@ -1639,30 +1757,32 @@ async def stream_quotes(
return # we never expect feed to come up? return # we never expect feed to come up?
async with open_aio_quote_stream( async with open_aio_quote_stream(
symbol=sym, contract=contract symbol=sym,
contract=con,
) as stream: ) as stream:
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []
log.debug(f"First ticker received {quote}")
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
async with aclosing(stream): async with aclosing(stream):
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (
# suffix = 'exchange' ibis.Commodity,
# calc_price = False # should be real volume for contract ibis.Forex
):
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be closed)
while True: while True:
ticker = await stream.receive() ticker = await stream.receive()
# for a real volume contract we rait for the first # for a real volume contract we rait for the first
# "real" trade to take place # "real" trade to take place
if not calc_price and not ticker.rtTime: if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we get a real # spin consuming tickers until we get a real
# market datum # market datum
log.debug(f"New unsent ticker: {ticker}") log.debug(f"New unsent ticker: {ticker}")
@ -1677,21 +1797,16 @@ async def stream_quotes(
# ``aclosing()`` above? # ``aclosing()`` above?
break break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live # tell caller quotes are now coming in live
feed_is_live.set() feed_is_live.set()
# last = time.time() # last = time.time()
async for ticker in stream: async for ticker in stream:
# print(f'ticker rate: {1/(time.time() - last)}') quote = normalize(ticker)
await send_chan.send({quote['fqsn']: quote})
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
)
quote['symbol'] = topic
await send_chan.send({topic: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []
@ -1709,11 +1824,11 @@ def pack_position(
symbol = con.localSymbol.replace(' ', '') symbol = con.localSymbol.replace(' ', '')
else: else:
# TODO: lookup fqsn even for derivs.
symbol = con.symbol.lower() symbol = con.symbol.lower()
exch = (con.primaryExchange or con.exchange).lower() exch = (con.primaryExchange or con.exchange).lower()
symkey = '.'.join((symbol, exch)) symkey = '.'.join((symbol, exch))
if not exch: if not exch:
# attempt to lookup the symbol from our # attempt to lookup the symbol from our
# hacked set.. # hacked set..
@ -1722,6 +1837,10 @@ def pack_position(
symkey = sym symkey = sym
break break
expiry = con.lastTradeDateOrContractMonth
if expiry:
symkey += f'.{expiry}'
# TODO: options contracts into a sane format.. # TODO: options contracts into a sane format..
return BrokerdPosition( return BrokerdPosition(
@ -2101,7 +2220,7 @@ async def open_symbol_search(
sn.start_soon( sn.start_soon(
stash_results, stash_results,
_trio_run_client_method( _trio_run_client_method(
method='search_stocks', method='search_symbols',
pattern=pattern, pattern=pattern,
upto=5, upto=5,
) )