Graph snap quote immediately on ib spin up

basic_alerts
Tyler Goodlet 2021-01-04 14:43:59 -05:00
parent c030b63101
commit c7ff0804db
1 changed files with 89 additions and 45 deletions

View File

@ -49,9 +49,11 @@ from ..data import (
attach_shm_array, attach_shm_array,
# get_shm_token, # get_shm_token,
subscribe_ohlc_for_increment, subscribe_ohlc_for_increment,
_buffer,
) )
from ..data._source import from_df from ..data._source import from_df
from ._util import SymbolNotFound from ._util import SymbolNotFound
from .._async_utils import maybe_with_if
log = get_logger(__name__) log = get_logger(__name__)
@ -355,11 +357,12 @@ class Client:
symbol: str, symbol: str,
to_trio, to_trio,
opts: Tuple[int] = ('375', '233',), opts: Tuple[int] = ('375', '233',),
contract: Optional[Contract] = None,
# opts: Tuple[int] = ('459',), # opts: Tuple[int] = ('459',),
) -> None: ) -> None:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
""" """
contract = await self.find_contract(symbol) contract = contract or (await self.find_contract(symbol))
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets # define a simple queue push routine that streams quote packets
@ -386,6 +389,20 @@ class Client:
# let the engine run and stream # let the engine run and stream
await self.ib.disconnectedEvent await self.ib.disconnectedEvent
async def get_quote(
self,
symbol: str,
) -> Ticker:
"""Return a single quote for symbol.
"""
contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(
contract,
snapshot=True,
)
return contract, (await ticker.updateEvent)
# default config ports # default config ports
_tws_port: int = 7497 _tws_port: int = 7497
@ -604,16 +621,21 @@ _local_buffer_writers = {}
@asynccontextmanager @asynccontextmanager
async def activate_writer(key: str) -> (bool, trio.Nursery): async def activate_writer(key: str) -> (bool, trio.Nursery):
"""Mark the current actor with module var determining
whether an existing shm writer task is already active.
This avoids more then one writer resulting in data
clobbering.
"""
global _local_buffer_writers
try: try:
writer_already_exists = _local_buffer_writers.get(key, False) assert not _local_buffer_writers.get(key, False)
if not writer_already_exists: _local_buffer_writers[key] = True
_local_buffer_writers[key] = True
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
yield writer_already_exists, n yield n
else:
yield writer_already_exists, None
finally: finally:
_local_buffer_writers.pop(key, None) _local_buffer_writers.pop(key, None)
@ -622,7 +644,7 @@ async def fill_bars(
sym: str, sym: str,
first_bars: list, first_bars: list,
shm: 'ShmArray', # type: ignore # noqa shm: 'ShmArray', # type: ignore # noqa
# count: int = 20, # NOTE: any more and we'll overrun the underlying buffer # count: int = 20, # NOTE: any more and we'll overrun underlying buffer
count: int = 2, # NOTE: any more and we'll overrun the underlying buffer count: int = 2, # NOTE: any more and we'll overrun the underlying buffer
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. """Fill historical bars into shared mem / storage afap.
@ -692,8 +714,14 @@ async def stream_quotes(
# TODO: support multiple subscriptions # TODO: support multiple subscriptions
sym = symbols[0] sym = symbols[0]
contract, first_ticker = await _trio_run_client_method(
method='get_quote',
symbol=sym,
)
stream = await _trio_run_client_method( stream = await _trio_run_client_method(
method='stream_ticker', method='stream_ticker',
contract=contract, # small speedup
symbol=sym, symbol=sym,
) )
@ -701,14 +729,17 @@ async def stream_quotes(
# check if a writer already is alive in a streaming task, # check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing # otherwise start one and mark it as now existing
async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
# maybe load historical ohlcv in to shared mem key = shm_token['shm_name']
# check if shm has already been created by previous
# feed initialization writer_already_exists = _local_buffer_writers.get(key, False)
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
async with trio.open_nursery() as ln:
if not writer_already_exists: if not writer_already_exists:
_local_buffer_writers[key] = True
shm = attach_shm_array( shm = attach_shm_array(
token=shm_token, token=shm_token,
@ -744,12 +775,33 @@ async def stream_quotes(
subscribe_ohlc_for_increment(shm, delay_s) subscribe_ohlc_for_increment(shm, delay_s)
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written
await ctx.send_yield((shm_token, not writer_already_exists)) await ctx.send_yield((shm_token, not writer_already_exists))
# first quote can be ignored as a 2nd with newer data is sent? # check for special contract types
first_ticker = await stream.__anext__() if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange'
# should be real volume for this contract
calc_price = False
else:
# commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price
suffix = 'secType'
calc_price = True
ticker = first_ticker
quote = normalize(first_ticker) # pass first quote asap
quote = normalize(first_ticker, calc_price=calc_price)
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic
first_quote = {topic: quote}
# yield first quote asap
await ctx.send_yield(first_quote)
# ticker.ticks = []
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
@ -762,39 +814,31 @@ async def stream_quotes(
calc_price = False # should be real volume for contract calc_price = False # should be real volume for contract
async for ticker in stream: # wait for real volume on feed (trading might be closed)
async for ticker in stream:
# for a real volume contract we rait for the first
# "real" trade to take place
if not calc_price and not ticker.rtTime:
# spin consuming tickers until we get a real market datum # spin consuming tickers until we get a real market datum
if not ticker.rtTime: log.debug(f"New unsent ticker: {ticker}")
log.debug(f"New unsent ticker: {ticker}") continue
continue else:
else: log.debug("Received first real volume tick")
log.debug("Received first real volume tick") # ugh, clear ticks since we've consumed them
# ugh, clear ticks since we've consumed them # (ahem, ib_insync is truly stateful trash)
# (ahem, ib_insync is truly stateful trash) ticker.ticks = []
ticker.ticks = []
# XXX: this works because we don't use # tell incrementer task it can start
# ``aclosing()`` above? _buffer.shm_incrementing(key).set()
break
else:
# commodities don't have an exchange name for some reason?
suffix = 'secType'
calc_price = True
ticker = first_ticker
quote = normalize(ticker, calc_price=calc_price) # XXX: this works because we don't use
con = quote['contract'] # ``aclosing()`` above?
topic = '.'.join((con['symbol'], con[suffix])).lower() break
quote['symbol'] = topic
first_quote = {topic: quote}
ticker.ticks = []
# yield first quote asap
await ctx.send_yield(first_quote)
# real-time stream # real-time stream
async for ticker in stream: async for ticker in stream:
# print(ticker.vwap) # print(ticker.vwap)
quote = normalize( quote = normalize(
ticker, ticker,