Include symbol deats in feed init message from ib

Async spawn a deats getter task whenever we load a symbol data feed.
Pass these symbol details in the first message delivered by the feed at
open. Move stream loop into a new func.
basic_orders
Tyler Goodlet 2021-01-22 17:11:53 -05:00
parent 5327d7be5e
commit 10e47e349c
1 changed files with 173 additions and 128 deletions

View File

@ -426,12 +426,15 @@ class Client:
""" """
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
details_fute = self.ib.reqContractDetailsAsync(contract)
ticker: Ticker = self.ib.reqMktData( ticker: Ticker = self.ib.reqMktData(
contract, contract,
snapshot=True, snapshot=True,
) )
ticker = await ticker.updateEvent ticker = await ticker.updateEvent
return contract, ticker details = (await details_fute)[0]
return contract, ticker, details
# async to be consistent for the client proxy, and cuz why not. # async to be consistent for the client proxy, and cuz why not.
async def submit_limit( async def submit_limit(
@ -440,7 +443,7 @@ class Client:
symbol: str, symbol: str,
price: float, price: float,
action: str, action: str,
size: int = 100, size: int,
) -> int: ) -> int:
"""Place an order and return integer request id provided by client. """Place an order and return integer request id provided by client.
@ -870,6 +873,7 @@ async def stream_quotes(
symbols: List[str], symbols: List[str],
shm_token: Tuple[str, str, List[tuple]], shm_token: Tuple[str, str, List[tuple]],
loglevel: str = None, loglevel: str = None,
# compat for @tractor.msg.pub # compat for @tractor.msg.pub
topics: Any = None, topics: Any = None,
get_topics: Callable = None, get_topics: Callable = None,
@ -885,10 +889,11 @@ async def stream_quotes(
# TODO: support multiple subscriptions # TODO: support multiple subscriptions
sym = symbols[0] sym = symbols[0]
contract, first_ticker = await _trio_run_client_method( async with trio.open_nursery() as n:
method='get_quote', contract, first_ticker, details = await _trio_run_client_method(
symbol=sym, method='get_quote',
) symbol=sym,
)
stream = await _trio_run_client_method( stream = await _trio_run_client_method(
method='stream_ticker', method='stream_ticker',
@ -896,8 +901,8 @@ async def stream_quotes(
symbol=sym, symbol=sym,
) )
async with aclosing(stream): shm = None
async with trio.open_nursery() as ln:
# check if a writer already is alive in a streaming task, # check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing # otherwise start one and mark it as now existing
@ -908,86 +913,100 @@ async def stream_quotes(
# maybe load historical ohlcv in to shared mem # maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous # check if shm has already been created by previous
# feed initialization # feed initialization
async with trio.open_nursery() as ln: if not writer_already_exists:
if not writer_already_exists: _local_buffer_writers[key] = True
_local_buffer_writers[key] = True
shm = attach_shm_array( shm = attach_shm_array(
token=shm_token, token=shm_token,
# we are the buffer writer # we are the buffer writer
readonly=False, readonly=False,
) )
# async def retrieve_and_push(): # async def retrieve_and_push():
start = time.time() start = time.time()
bars, bars_array = await _trio_run_client_method( bars, bars_array = await _trio_run_client_method(
method='bars', method='bars',
symbol=sym, symbol=sym,
) )
log.info(f"bars_array request: {time.time() - start}") log.info(f"bars_array request: {time.time() - start}")
if bars_array is None: if bars_array is None:
raise SymbolNotFound(sym) raise SymbolNotFound(sym)
# write historical data to buffer # write historical data to buffer
shm.push(bars_array) shm.push(bars_array)
shm_token = shm.token shm_token = shm.token
# TODO: generalize this for other brokers # TODO: generalize this for other brokers
# start bar filler task in bg # start bar filler task in bg
ln.start_soon(fill_bars, sym, bars, shm) ln.start_soon(fill_bars, sym, bars, shm)
times = shm.array['time'] times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s) subscribe_ohlc_for_increment(shm, delay_s)
# pass back some symbol info like min_tick, trading_hours, etc.
# con = asdict(contract)
# syminfo = contract
symdeats = asdict(details)
symdeats.update(symdeats['contract'])
# TODO: for loop through all symbols passed in
init_msgs = {
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
await ctx.send_yield((shm_token, not writer_already_exists)) sym: {
'is_shm_writer': not writer_already_exists,
'shm_token': shm_token,
'symbol_info': symdeats,
}
}
await ctx.send_yield(init_msgs)
# check for special contract types # check for special contract types
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange' suffix = 'exchange'
# should be real volume for this contract # should be real volume for this contract
calc_price = False calc_price = False
else: else:
# commodities and forex don't have an exchange name and # commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price # no real volume so we have to calculate the price
suffix = 'secType' suffix = 'secType'
calc_price = True calc_price = True
# ticker = first_ticker # ticker = first_ticker
# pass first quote asap # pass first quote asap
quote = normalize(first_ticker, calc_price=calc_price) quote = normalize(first_ticker, calc_price=calc_price)
con = quote['contract'] con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic quote['symbol'] = topic
first_quote = {topic: quote} first_quote = {topic: quote}
# yield first quote asap # yield first quote asap
await ctx.send_yield(first_quote) await ctx.send_yield(first_quote)
# ticker.ticks = [] # ticker.ticks = []
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []
log.debug(f"First ticker received {quote}") log.debug(f"First ticker received {quote}")
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange' suffix = 'exchange'
calc_price = False # should be real volume for contract calc_price = False # should be real volume for contract
# with trio.move_on_after(10) as cs: # with trio.move_on_after(10) as cs:
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be closed)
async with aclosing(stream):
async for ticker in stream: async for ticker in stream:
# for a real volume contract we rait for the first # for a real volume contract we rait for the first
@ -1009,76 +1028,105 @@ async def stream_quotes(
# ``aclosing()`` above? # ``aclosing()`` above?
break break
# if cs.cancelled_caught: # enter stream loop
# await tractor.breakpoint() try:
await stream_and_write(
# real-time stream stream=stream,
async for ticker in stream: calc_price=calc_price,
topic=topic,
# print(ticker.vwap) writer_already_exists=writer_already_exists,
quote = normalize( shm=shm,
ticker, suffix=suffix,
calc_price=calc_price ctx=ctx,
) )
quote['symbol'] = topic finally:
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_already_exists: if not writer_already_exists:
for tick in iterticks(quote, types=('trade', 'utrade',)): _local_buffer_writers[key] = False
last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick.get('size', 0)
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
v + new_v,
)
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic
await ctx.send_yield({topic: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
@tractor.msg.pub async def stream_and_write(
stream,
calc_price: bool,
topic: str,
writer_already_exists: bool,
suffix: str,
ctx: tractor.Context,
shm: Optional['SharedArray'], # noqa
) -> None:
"""Core quote streaming and shm writing loop; optimize for speed!
"""
# real-time stream
async for ticker in stream:
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
)
quote['symbol'] = topic
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_already_exists:
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick.get('size', 0)
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
v + new_v,
)
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic
await ctx.send_yield({topic: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
@tractor.msg.pub(
send_on_connect={'local_trades': 'start'}
)
async def stream_trades( async def stream_trades(
loglevel: str = None, loglevel: str = None,
get_topics: Callable = None, get_topics: Callable = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[Dict[str, Any]]:
global _trades_stream_is_live
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
@ -1086,9 +1134,6 @@ async def stream_trades(
method='recv_trade_updates', method='recv_trade_updates',
) )
# startup msg
yield {'local_trades': 'start'}
async for event_name, item in stream: async for event_name, item in stream:
# XXX: begin normalization of nonsense ib_insync internal # XXX: begin normalization of nonsense ib_insync internal