Use mem-chans for quote streams; clone for multiple consumers

cached_feeds
Tyler Goodlet 2021-03-23 08:35:11 -04:00
parent 17d3e7a9e2
commit aa61bf5a65
1 changed files with 143 additions and 68 deletions

View File

@ -168,6 +168,7 @@ class Client:
# contract cache # contract cache
self._contracts: Dict[str, Contract] = {} self._contracts: Dict[str, Contract] = {}
self._feeds: Dict[str, trio.abc.SendChannel] = {}
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
@ -387,7 +388,6 @@ class Client:
async def stream_ticker( async def stream_ticker(
self, self,
symbol: str, symbol: str,
to_trio,
opts: Tuple[int] = ('375', '233', '236'), opts: Tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None, contract: Optional[Contract] = None,
) -> None: ) -> None:
@ -396,8 +396,16 @@ class Client:
contract = contract or (await self.find_contract(symbol)) contract = contract or (await self.find_contract(symbol))
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
feed = self._feeds.get(symbol)
if feed:
# do something else
# await tractor.breakpoint()
to_trio, from_aio = feed
return from_aio.clone()
# define a simple queue push routine that streams quote packets # define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel. # to trio over the ``to_trio`` memory channel.
to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def push(t): def push(t):
"""Push quotes to trio task. """Push quotes to trio task.
@ -415,10 +423,15 @@ class Client:
log.error(f"Disconnected stream for `{symbol}`") log.error(f"Disconnected stream for `{symbol}`")
self.ib.cancelMktData(contract) self.ib.cancelMktData(contract)
# decouple broadcast mem chan
self._feeds.pop(symbol, None)
ticker.updateEvent.connect(push) ticker.updateEvent.connect(push)
# let the engine run and stream # cache feed for later consumers
await self.ib.disconnectedEvent self._feeds[symbol] = to_trio, from_aio
return from_aio
async def get_quote( async def get_quote(
self, self,
@ -691,13 +704,14 @@ async def _trio_run_client_method(
# if the method is an *async gen* stream for it # if the method is an *async gen* stream for it
meth = getattr(Client, method) meth = getattr(Client, method)
if inspect.isasyncgenfunction(meth):
kwargs['_treat_as_stream'] = True
# if the method is an *async func* but manually
# streams back results, make sure to also stream it
args = tuple(inspect.getfullargspec(meth).args) args = tuple(inspect.getfullargspec(meth).args)
if 'to_trio' in args:
if inspect.isasyncgenfunction(meth) or (
# if the method is an *async func* but manually
# streams back results, make sure to also stream it
'to_trio' in args
):
kwargs['_treat_as_stream'] = True kwargs['_treat_as_stream'] = True
result = await tractor.to_asyncio.run_task( result = await tractor.to_asyncio.run_task(
@ -780,7 +794,7 @@ def normalize(
# convert named tuples to dicts so we send usable keys # convert named tuples to dicts so we send usable keys
new_ticks = [] new_ticks = []
for tick in ticker.ticks: for tick in ticker.ticks:
if tick: if tick and not isinstance(tick, dict):
td = tick._asdict() td = tick._asdict()
td['type'] = tick_types.get(td['tickType'], 'n/a') td['type'] = tick_types.get(td['tickType'], 'n/a')
@ -840,7 +854,7 @@ async def fill_bars(
first_bars: list, first_bars: list,
shm: 'ShmArray', # type: ignore # noqa shm: 'ShmArray', # type: ignore # noqa
# count: int = 20, # NOTE: any more and we'll overrun underlying buffer # count: int = 20, # NOTE: any more and we'll overrun underlying buffer
count: int = 6, # NOTE: any more and we'll overrun the underlying buffer count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. """Fill historical bars into shared mem / storage afap.
@ -904,6 +918,62 @@ asset_type_map = {
} }
_quote_streams: Dict[str, trio.abc.ReceiveStream] = {}
async def stream_quotes(
client,
symbol: str,
opts: Tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None,
) -> None:
"""Stream a ticker using the std L1 api.
"""
contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
chans = _quote_streams.get(symbol)
if feed:
# if we already have a cached feed deliver a rx side clone to
# consumer
to_trio, from_aio = chans
return from_aio.clone()
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def push(t):
"""Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
self.ib.cancelMktData(contract)
# decouple broadcast mem chan
self._feeds.pop(symbol, None)
ticker.updateEvent.connect(push)
# cache feed for later consumers
_quote_streams[symbol] = to_trio, from_aio
return from_aio
# TODO: figure out how to share quote feeds sanely despite # TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api. # the wacky ``ib_insync`` api.
# @tractor.msg.pub # @tractor.msg.pub
@ -1058,6 +1128,7 @@ async def stream_quotes(
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be closed)
async with aclosing(stream): async with aclosing(stream):
async for ticker in stream: async for ticker in stream:
# for a real volume contract we rait for the first # for a real volume contract we rait for the first
@ -1081,25 +1152,28 @@ async def stream_quotes(
# enter stream loop # enter stream loop
try: try:
await stream_and_write( async with stream:
stream=stream, await stream_and_write(
calc_price=calc_price, stream=stream,
topic=topic, calc_price=calc_price,
writer_already_exists=writer_already_exists, topic=topic,
shm=shm, write_shm=not writer_already_exists,
suffix=suffix, shm=shm,
ctx=ctx, suffix=suffix,
) ctx=ctx,
)
finally: finally:
if not writer_already_exists: if not writer_already_exists:
_local_buffer_writers[key] = False _local_buffer_writers[key] = False
stream.close()
async def stream_and_write( async def stream_and_write(
stream, stream,
calc_price: bool, calc_price: bool,
topic: str, topic: str,
writer_already_exists: bool, write_shm: bool,
suffix: str, suffix: str,
ctx: tractor.Context, ctx: tractor.Context,
shm: Optional['SharedArray'], # noqa shm: Optional['SharedArray'], # noqa
@ -1108,64 +1182,65 @@ async def stream_and_write(
""" """
# real-time stream # real-time stream
async for ticker in stream: async with stream:
async for ticker in stream:
# print(ticker.vwap) # print(ticker.vwap)
quote = normalize( quote = normalize(
ticker, ticker,
calc_price=calc_price calc_price=calc_price
) )
quote['symbol'] = topic quote['symbol'] = topic
# TODO: in theory you can send the IPC msg *before* # TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency, # writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support # however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching # here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed # at the yield such that the array write isn't delayed
# while another consumer is serviced.. # while another consumer is serviced..
# if we are the lone tick writer start writing # if we are the lone tick writer start writing
# the buffer with appropriate trade data # the buffer with appropriate trade data
if not writer_already_exists: if write_shm:
for tick in iterticks(quote, types=('trade', 'utrade',)): for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price'] last = tick['price']
# print(f"{quote['symbol']}: {tick}") # print(f"{quote['symbol']}: {tick}")
# update last entry # update last entry
# benchmarked in the 4-5 us range # benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][ o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume'] ['open', 'high', 'low', 'volume']
] ]
new_v = tick.get('size', 0) new_v = tick.get('size', 0)
if v == 0 and new_v: if v == 0 and new_v:
# no trades for this bar yet so the open # no trades for this bar yet so the open
# is also the close/last trade price # is also the close/last trade price
o = last o = last
shm.array[[ shm.array[[
'open', 'open',
'high', 'high',
'low', 'low',
'close', 'close',
'volume', 'volume',
]][-1] = ( ]][-1] = (
o, o,
max(high, last), max(high, last),
min(low, last), min(low, last),
last, last,
v + new_v, v + new_v,
) )
con = quote['contract'] con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic quote['symbol'] = topic
await ctx.send_yield({topic: quote}) await ctx.send_yield({topic: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []
def pack_position(pos: Position) -> Dict[str, Any]: def pack_position(pos: Position) -> Dict[str, Any]: