Support "expiry" suffixes for derivatives with ib

To start we only have futes working but this allows both searching
and loading multiple expiries of the same instrument by specifying
different expiries with a `.<expiry>` suffix in the symbol key (eg.
`mnq.globex.20220617`). This also paves the way for options contracts
which will need something similar plus a strike property. This change
set also required a patch to `ib_insync` to allow retrieving multiple
"ambiguous" contracts from the `IB.reqContractDetailsAcync()` method,
see https://github.com/erdewit/ib_insync/pull/454 for further discussion
since the approach here might change.

This patch also includes a lot of serious reworking of some `trio`-`asyncio`
integration to use the newer `tractor.to_asyncio.open_channel_from()`
api and use it (with a relay task) to open a persistent connection with
an in-actor `ib_insync` `Client` mostly for history requests.

Deats,
- annot the module with a `_infect_asyncio: bool` for `tractor` spawning
- add a futes venu list
- support ambiguous futes contracts lookups so that all expiries will
  show in search
- support both continuous and specific expiry fute contract
  qualification
- allow searching with "fqsn" keys
- don't crash on "data not found" errors in history requests
- move all quotes msg "topic-key" generation (which should now be
  a broker-specific fqsn) and per-contract quote processing into
  `normalize()`
- set the fqsn key in the symbol info init msg
- use `open_client_proxy()` in bars backfiller endpoint
- include expiry suffix in position update keys
broker_bumpz
Tyler Goodlet 2022-03-18 09:25:39 -04:00
parent 937406534c
commit 1e433ca4f4
1 changed files with 261 additions and 146 deletions

View File

@ -60,7 +60,6 @@ import numpy as np
from .. import config
from ..log import get_logger, get_console_log
# from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df
from ..data._sharedmem import ShmArray
from ._util import SymbolNotFound, NoData
@ -116,9 +115,14 @@ _search_conf = {
}
# annotation to let backend agnostic code
# know if ``brokerd`` should be spawned with
# ``tractor``'s aio mode.
_infect_asyncio: bool = True
# overrides to sidestep pretty questionable design decisions in
# ``ib_insync``:
class NonShittyWrapper(Wrapper):
def tcpDataArrived(self):
"""Override time stamps to be floats for now.
@ -173,6 +177,13 @@ _adhoc_cmdty_data_map = {
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
_futes_venues = (
'GLOBEX',
'NYMEX',
'CME',
'CMECRYPTO',
)
_adhoc_futes_set = {
# equities
@ -243,7 +254,7 @@ class Client:
async def bars(
self,
symbol: str,
fqsn: str,
# EST in ISO 8601 format is required... below is EPOCH
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
end_dt: Union[datetime, str] = "",
@ -254,7 +265,7 @@ class Client:
is_paid_feed: bool = False, # placeholder
) -> list[dict[str, Any]]:
'''
Retreive OHLCV bars for a symbol over a range to the present.
Retreive OHLCV bars for a fqsn over a range to the present.
'''
bars_kwargs = {'whatToShow': 'TRADES'}
@ -263,7 +274,7 @@ class Client:
print(f'ENTER BARS {_enters} @ end={end_dt}')
_enters += 1
contract = await self.find_contract(symbol)
contract = await self.find_contract(fqsn)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count)
@ -300,7 +311,7 @@ class Client:
)
if not bars:
# TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?")
raise ValueError(f"No bars retreived for {fqsn}?")
# TODO: rewrite this faster with ``numba``
# convert to pandas dataframe:
@ -342,23 +353,24 @@ class Client:
async def search_stocks(
self,
pattern: str,
get_details: bool = False,
# how many contracts to search "up to"
upto: int = 3,
upto: int = 3, # how many contracts to search "up to"
) -> dict[str, ContractDetails]:
"""Search for stocks matching provided ``str`` pattern.
'''
Search for stocks matching provided ``str`` pattern.
Return a dictionary of ``upto`` entries worth of contract details.
"""
'''
descriptions = await self.ib.reqMatchingSymbolsAsync(pattern)
if descriptions is not None:
descrs = descriptions[:upto]
if get_details:
return await self.con_deats([d.contract for d in descrs])
deats = await self.con_deats([d.contract for d in descrs])
return deats
else:
results = {}
@ -368,6 +380,10 @@ class Client:
# from search?
exch = con.primaryExchange.rsplit('.')[0]
unique_sym = f'{con.symbol}.{exch}'
expiry = con.lastTradeDateOrContractMonth
if expiry:
unique_sym += f'{expiry}'
results[unique_sym] = {}
return results
@ -385,26 +401,75 @@ class Client:
# TODO add search though our adhoc-locally defined symbol set
# for futes/cmdtys/
return await self.search_stocks(pattern, upto, get_details=True)
results = await self.search_stocks(
pattern,
upto=upto,
get_details=True,
)
async def get_cont_fute(
for key, contracts in results.copy().items():
tract = contracts['contract']
sym = tract['symbol']
sectype = tract['secType']
if sectype == 'IND':
results[f'{sym}.IND'] = tract
results.pop(key)
exch = tract['exchange']
if exch in _futes_venues:
# try get all possible contracts for symbol as per,
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
con = Contract(
'FUT+CONTFUT',
symbol=sym,
exchange=exch,
)
possibles = await self.ib.qualifyContractsAsync(con)
for i, condict in enumerate(sorted(
map(asdict, possibles),
# sort by expiry
key=lambda con: con['lastTradeDateOrContractMonth'],
)):
expiry = condict['lastTradeDateOrContractMonth']
results[f'{sym}.{exch}.{expiry}'] = condict
return results
async def get_fute(
self,
symbol: str,
exchange: str,
) -> Contract:
"""Get an unqualifed contract for the current "continous" future.
"""
contcon = ibis.ContFuture(symbol, exchange=exchange)
expiry: str = '',
front: bool = False,
) -> Contract:
'''
Get an unqualifed contract for the current "continous" future.
'''
# it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId)
if front:
con = (await self.ib.qualifyContractsAsync(
ibis.ContFuture(symbol, exchange=exchange)
))[0]
else:
con = (await self.ib.qualifyContractsAsync(
ibis.Future(
symbol,
exchange=exchange,
lastTradeDateOrContractMonth=expiry,
)
))[0]
return con
async def find_contract(
self,
symbol,
pattern: str,
currency: str = 'USD',
**kwargs,
) -> Contract:
# TODO: we can't use this currently because
@ -418,11 +483,20 @@ class Client:
# XXX UPDATE: we can probably do the tick/trades scraping
# inside our eventkit handler instead to bypass this entirely?
if 'ib' in pattern:
from ..data._source import uncons_fqsn
broker, symbol, expiry = uncons_fqsn(pattern)
else:
symbol = pattern
# try:
# # give the cache a go
# return self._contracts[symbol]
# except KeyError:
# log.debug(f'Looking up contract for {symbol}')
expiry: str = ''
if symbol.count('.') > 1:
symbol, _, expiry = symbol.rpartition('.')
# use heuristics to figure out contract "type"
try:
@ -431,9 +505,27 @@ class Client:
# likely there's an embedded `.` for a forex pair
breakpoint()
qualify: bool = True
# futes
if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'):
con = await self.get_cont_fute(symbol=sym, exchange=exch)
if exch in _futes_venues:
if expiry:
# get the "front" contract
contract = await self.get_fute(
symbol=sym,
exchange=exch,
expiry=expiry,
)
else:
# get the "front" contract
contract = await self.get_fute(
symbol=sym,
exchange=exch,
front=True,
)
qualify = False
elif exch in ('FOREX'):
currency = ''
@ -473,12 +565,15 @@ class Client:
)
try:
exch = 'SMART' if not exch else exch
if qualify:
contract = (await self.ib.qualifyContractsAsync(con))[0]
else:
assert contract
except IndexError:
raise ValueError(f"No contract could be found {con}")
self._contracts[symbol] = contract
self._contracts[pattern] = contract
return contract
async def get_head_time(
@ -828,8 +923,8 @@ async def load_aio_clients(
accounts_found: dict[str, Client] = {}
if (
client and client.ib.isConnected() or
sockaddr in _scan_ignore
client and client.ib.isConnected()
or sockaddr in _scan_ignore
):
continue
@ -1039,8 +1134,12 @@ async def open_aio_client_method_relay(
# relay all method requests to ``asyncio``-side client and
# deliver back results
while True:
while not to_trio._closed:
msg = await from_trio.get()
if msg is None:
print('asyncio PROXY-RELAY SHUTDOWN')
break
meth_name, kwargs = msg
meth = getattr(client, meth_name)
@ -1071,8 +1170,17 @@ async def open_client_proxy() -> MethodProxy:
yield proxy
except RequestError as err:
code, msg = err.code, err.message
# terminate asyncio side task
await chan.send(None)
except (
RequestError,
BaseException,
)as err:
code = getattr(err, 'code', None)
if code:
msg = err.message
await tractor.breakpoint()
# TODO: retreive underlying ``ib_insync`` error?
if (
@ -1080,6 +1188,7 @@ async def open_client_proxy() -> MethodProxy:
'HMDS query returned no data' in msg
or 'No market data permissions for' in msg
)
or code == 200
):
# these cases should not cause a task crash
log.warning(msg)
@ -1088,28 +1197,20 @@ async def open_client_proxy() -> MethodProxy:
raise
# @acm
# async def get_client(
# **kwargs,
@acm
async def get_client(
**kwargs,
# ) -> Client:
# '''
# Init the ``ib_insync`` client in another actor and return
# a method proxy to it.
) -> Client:
'''
Init the ``ib_insync`` client in another actor and return
a method proxy to it.
# '''
# async with (
# maybe_spawn_brokerd(
# brokername='ib',
# infect_asyncio=True,
# **kwargs
# ) as portal,
# ):
# assert 0
'''
# TODO: the IPC via portal relay layer for when this current
# actor isn't in aio mode.
# open_client_proxy() as proxy,
# yield proxy
async with open_client_proxy() as proxy:
yield proxy
# https://interactivebrokers.github.io/tws-api/tick_types.html
@ -1137,11 +1238,40 @@ tick_types = {
}
# TODO: cython/mypyc/numba this!
def normalize(
ticker: Ticker,
calc_price: bool = False
) -> dict:
# should be real volume for this contract by default
calc_price = False
# check for special contract types
con = ticker.contract
if type(con) in (
ibis.Commodity,
ibis.Forex,
):
# commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price
suffix = con.secType
# no real volume on this tract
calc_price = True
else:
suffix = con.primaryExchange
if not suffix:
suffix = con.exchange
# append a `.<suffix>` to the returned symbol
# key for derivatives that normally is the expiry
# date key.
expiry = con.lastTradeDateOrContractMonth
if expiry:
suffix += f'.{expiry}'
# convert named tuples to dicts so we send usable keys
new_ticks = []
for tick in ticker.ticks:
@ -1170,6 +1300,12 @@ def normalize(
# serialize for transport
data = asdict(ticker)
# generate fqsn with possible specialized suffix
# for derivatives.
data['symbol'] = data['fqsn'] = '.'.join(
(con.symbol, suffix)
).lower()
# convert named tuples to dicts for transport
tbts = data.get('tickByTicks')
if tbts:
@ -1191,7 +1327,7 @@ def normalize(
async def get_bars(
proxy: MethodProxy,
sym: str,
fqsn: str,
end_dt: str = "",
) -> (dict, np.ndarray):
@ -1204,15 +1340,15 @@ async def get_bars(
fails = 0
bars: Optional[list] = None
for _ in range(2):
for _ in range(3):
try:
bars, bars_array = await proxy.bars(
symbol=sym,
fqsn=fqsn,
end_dt=end_dt,
)
if bars_array is None:
raise SymbolNotFound(sym)
raise SymbolNotFound(fqsn)
next_dt = bars[0].date
print(f'ib datetime {next_dt}')
@ -1252,7 +1388,7 @@ async def get_bars(
elif 'No market data permissions for' in err.message:
# TODO: signalling for no permissions searches
raise NoData(f'Symbol: {sym}')
raise NoData(f'Symbol: {fqsn}')
break
else:
@ -1311,7 +1447,7 @@ async def open_history_client(
async def backfill_bars(
sym: str,
fqsn: str,
shm: ShmArray, # type: ignore # noqa
# TODO: we want to avoid overrunning the underlying shm array buffer
@ -1331,7 +1467,9 @@ async def backfill_bars(
https://github.com/pikers/piker/issues/128
'''
# async with open_history_client(sym) as proxy:
with trio.CancelScope() as cs:
# async with open_history_client(fqsn) as proxy:
async with open_client_proxy() as proxy:
if platform.system() == 'Windows':
@ -1339,7 +1477,7 @@ async def backfill_bars(
'Decreasing history query count to 4 since, windows...')
count = 4
out, fails = await get_bars(proxy, sym)
out, fails = await get_bars(proxy, fqsn)
if out is None:
raise RuntimeError("Could not pull currrent history?!")
@ -1351,14 +1489,12 @@ async def backfill_bars(
# write historical data to buffer
shm.push(bars_array)
with trio.CancelScope() as cs:
task_status.started(cs)
i = 0
while i < count:
out, fails = await get_bars(proxy, sym, end_dt=next_dt)
out, fails = await get_bars(proxy, fqsn, end_dt=next_dt)
if fails is None or fails > 1:
break
@ -1430,8 +1566,10 @@ async def _setup_quote_stream(
contract: Optional[Contract] = None,
) -> trio.abc.ReceiveChannel:
"""Stream a ticker using the std L1 api.
"""
'''
Stream a ticker using the std L1 api.
'''
global _quote_streams
to_trio.send_nowait(None)
@ -1519,7 +1657,10 @@ async def open_aio_quote_stream(
if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer
async with broadcast_receiver(from_aio) as from_aio:
async with broadcast_receiver(
from_aio,
2**6,
) as from_aio:
yield from_aio
return
@ -1555,17 +1696,13 @@ async def stream_quotes(
'''
# TODO: support multiple subscriptions
sym = symbols[0]
log.info(f'request for real-time quotes: {sym}')
contract, first_ticker, details = await _trio_run_client_method(
con, first_ticker, details = await _trio_run_client_method(
method='get_sym_details',
symbol=sym,
)
with trio.move_on_after(1):
contract, first_ticker, details = await _trio_run_client_method(
method='get_quote',
symbol=sym,
)
first_quote = normalize(first_ticker)
def mk_init_msgs() -> dict[str, dict]:
# pass back some symbol info like min_tick, trading_hours, etc.
@ -1593,41 +1730,18 @@ async def stream_quotes(
# and that history has been written
sym: {
'symbol_info': syminfo,
'fqsn': first_quote['fqsn'],
}
}
return init_msgs
init_msgs = mk_init_msgs()
con = first_ticker.contract
# should be real volume for this contract by default
calc_price = False
# check for special contract types
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = con.primaryExchange
if not suffix:
suffix = con.exchange
else:
# commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price
suffix = con.secType
# no real volume on this tract
calc_price = True
quote = normalize(first_ticker, calc_price=calc_price)
con = quote['contract']
topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic
# for compat with upcoming fqsn based derivs search
init_msgs[sym]['fqsn'] = topic
# pass first quote asap
first_quote = quote
with trio.move_on_after(1):
contract, first_ticker, details = await _trio_run_client_method(
method='get_quote',
symbol=sym,
)
# it might be outside regular trading hours so see if we can at
# least grab history.
@ -1643,30 +1757,32 @@ async def stream_quotes(
return # we never expect feed to come up?
async with open_aio_quote_stream(
symbol=sym, contract=contract
symbol=sym,
contract=con,
) as stream:
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
log.debug(f"First ticker received {quote}")
task_status.started((init_msgs, first_quote))
async with aclosing(stream):
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
# suffix = 'exchange'
# calc_price = False # should be real volume for contract
if type(first_ticker.contract) not in (
ibis.Commodity,
ibis.Forex
):
# wait for real volume on feed (trading might be closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
if not calc_price and not ticker.rtTime:
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}")
@ -1681,21 +1797,16 @@ async def stream_quotes(
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live
feed_is_live.set()
# last = time.time()
async for ticker in stream:
# print(f'ticker rate: {1/(time.time() - last)}')
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
)
quote['symbol'] = topic
await send_chan.send({topic: quote})
quote = normalize(ticker)
await send_chan.send({quote['fqsn']: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
@ -1713,11 +1824,11 @@ def pack_position(
symbol = con.localSymbol.replace(' ', '')
else:
# TODO: lookup fqsn even for derivs.
symbol = con.symbol.lower()
exch = (con.primaryExchange or con.exchange).lower()
symkey = '.'.join((symbol, exch))
if not exch:
# attempt to lookup the symbol from our
# hacked set..
@ -1726,6 +1837,10 @@ def pack_position(
symkey = sym
break
expiry = con.lastTradeDateOrContractMonth
if expiry:
symkey += f'.{expiry}'
# TODO: options contracts into a sane format..
return BrokerdPosition(
@ -2105,7 +2220,7 @@ async def open_symbol_search(
sn.start_soon(
stash_results,
_trio_run_client_method(
method='search_stocks',
method='search_symbols',
pattern=pattern,
upto=5,
)