Variety of IB backend improvements

- Move to new shared mem system only writing on the first (by process)
  entry to `stream_quotes()`.
- Deliver bars before first quote arrives so that chart can populate and
  then wait for initial arrival.
- Allow caching clients per actor.
- Load bars using the same (cached) client that starts the quote stream
  thus speeding up initialization.
bar_select
Tyler Goodlet 2020-09-22 12:24:02 -04:00
parent b1093dc71d
commit d93ce84a99
1 changed files with 96 additions and 88 deletions

View File

@ -26,8 +26,10 @@ import tractor
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ( from ..data import (
maybe_spawn_brokerd, iterticks, attach_shared_array, maybe_spawn_brokerd,
incr_buffer, iterticks,
attach_shm_array,
get_shm_token
) )
from ..ui._source import from_df from ..ui._source import from_df
@ -145,7 +147,7 @@ class Client:
# durationStr='1 D', # durationStr='1 D',
# time length calcs # time length calcs
durationStr='{count} S'.format(count=5000 * 5), durationStr='{count} S'.format(count=1000 * 5),
barSizeSetting='5 secs', barSizeSetting='5 secs',
# always use extended hours # always use extended hours
@ -311,6 +313,8 @@ class Client:
_tws_port: int = 7497 _tws_port: int = 7497
_gw_port: int = 4002 _gw_port: int = 4002
_try_ports = [_tws_port, _gw_port] _try_ports = [_tws_port, _gw_port]
_client_ids = itertools.count()
_client_cache = {}
@asynccontextmanager @asynccontextmanager
@ -321,36 +325,39 @@ async def _aio_get_client(
) -> Client: ) -> Client:
"""Return an ``ib_insync.IB`` instance wrapped in our client API. """Return an ``ib_insync.IB`` instance wrapped in our client API.
""" """
if client_id is None: # first check cache for existing client
# if this is a persistent brokerd, try to allocate a new id for
# each client
try:
ss = tractor.current_actor().statespace
client_id = next(ss.setdefault('client_ids', itertools.count()))
# TODO: in case the arbiter has no record
# of existing brokerd we need to broadcase for one.
except RuntimeError:
# tractor likely isn't running
client_id = 1
ib = NonShittyIB()
ports = _try_ports if port is None else [port]
_err = None
for port in ports:
try:
await ib.connectAsync(host, port, clientId=client_id)
break
except ConnectionRefusedError as ce:
_err = ce
log.warning(f'Failed to connect on {port}')
else:
raise ConnectionRefusedError(_err)
try: try:
yield Client(ib) yield _client_cache[(host, port)]
except BaseException: except KeyError:
ib.disconnect() # TODO: in case the arbiter has no record
raise # of existing brokerd we need to broadcast for one.
if client_id is None:
# if this is a persistent brokerd, try to allocate a new id for
# each client
client_id = next(_client_ids)
ib = NonShittyIB()
ports = _try_ports if port is None else [port]
_err = None
for port in ports:
try:
await ib.connectAsync(host, port, clientId=client_id)
break
except ConnectionRefusedError as ce:
_err = ce
log.warning(f'Failed to connect on {port}')
else:
raise ConnectionRefusedError(_err)
try:
client = Client(ib)
_client_cache[(host, port)] = client
yield client
except BaseException:
ib.disconnect()
raise
async def _aio_run_client_method( async def _aio_run_client_method(
@ -489,7 +496,7 @@ def normalize(
# @tractor.msg.pub # @tractor.msg.pub
async def stream_quotes( async def stream_quotes(
symbols: List[str], symbols: List[str],
shared_array_token: Tuple[str, str], shm_token: Tuple[str, str, List[tuple]],
loglevel: str = None, loglevel: str = None,
# compat for @tractor.msg.pub # compat for @tractor.msg.pub
topics: Any = None, topics: Any = None,
@ -506,16 +513,35 @@ async def stream_quotes(
# TODO: support multiple subscriptions # TODO: support multiple subscriptions
sym = symbols[0] sym = symbols[0]
stream = await tractor.to_asyncio.run_task( stream = await _trio_run_client_method(
_trio_run_client_method,
method='stream_ticker', method='stream_ticker',
symbol=sym, symbol=sym,
) )
async with get_client() as client:
bars = await client.bars(symbol=sym)
async with aclosing(stream): async with aclosing(stream):
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
writer_exists = get_shm_token(shm_token['shm_name'])
if not writer_exists:
shm = attach_shm_array(
token=shm_token,
# we are writer
readonly=False,
)
bars = await _trio_run_client_method(
method='bars',
symbol=sym,
)
shm.push(bars)
shm_token = shm.token
# pass back token, and bool, signalling if we're the writer
yield shm_token, not writer_exists
# first quote can be ignored as a 2nd with newer data is sent? # first quote can be ignored as a 2nd with newer data is sent?
first_ticker = await stream.__anext__() first_ticker = await stream.__anext__()
quote = normalize(first_ticker) quote = normalize(first_ticker)
@ -538,7 +564,7 @@ async def stream_quotes(
else: else:
log.debug("Received first real volume tick") log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is truly stateful trash)
ticker.ticks = [] ticker.ticks = []
# XXX: this works because we don't use # XXX: this works because we don't use
@ -555,58 +581,40 @@ async def stream_quotes(
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
first_quote = {topic: quote} first_quote = {topic: quote}
ticker.ticks = [] ticker.ticks = []
# yield first quote asap
yield first_quote
# load historical ohlcv in to shared mem async for ticker in stream:
ss = tractor.current_actor().statespace quote = normalize(
existing_shm = ss.get(f'ib_shm.{sym}') ticker,
if not existing_shm: calc_price=calc_price
readonly = False )
else: # TODO: in theory you can send the IPC msg *before*
readonly = True # writing to the sharedmem array to decrease latency,
shm = existing_shm # however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
with attach_shared_array( # if we are the lone tick writer start writing
token=shared_array_token, # the buffer with appropriate trade data
readonly=readonly if not writer_exists:
) as shm: for tick in iterticks(quote, type='trade'):
if not existing_shm: last = tick['price']
shm.push(bars) # print(f'broker last: {tick}')
ss[f'ib_shm.{sym}'] = shm
yield (first_quote, shm.token) # update last entry
else: # benchmarked in the 4-5 us range
yield (first_quote, None) high, low = shm.array[-1][['high', 'low']]
shm.array[['high', 'low', 'close']][-1] = (
max(high, last),
min(low, last),
last,
)
async for ticker in stream: con = quote['contract']
quote = normalize( topic = '.'.join((con['symbol'], con[suffix])).lower()
ticker, yield {topic: quote}
calc_price=calc_price
)
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# if we are the lone tick writer # ugh, clear ticks since we've consumed them
if not existing_shm: ticker.ticks = []
for tick in iterticks(quote, type='trade'):
last = tick['price']
# print(f'broker last: {tick}')
# update last entry
# benchmarked in the 4-5 us range
high, low = shm.array[-1][['high', 'low']]
shm.array[['high', 'low', 'close']][-1] = (
max(high, last),
min(low, last),
last,
)
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
yield {topic: quote}
# ugh, clear ticks since we've consumed them
ticker.ticks = []