Move quote stream setup into a cacheing func

cached_feeds
Tyler Goodlet 2021-03-23 11:37:27 -04:00
parent aa61bf5a65
commit 0d4073dbd2
1 changed files with 58 additions and 97 deletions

View File

@ -385,54 +385,6 @@ class Client:
formatDate=2, # timezone aware UTC datetime formatDate=2, # timezone aware UTC datetime
) )
async def stream_ticker(
self,
symbol: str,
opts: Tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None,
) -> None:
"""Stream a ticker using the std L1 api.
"""
contract = contract or (await self.find_contract(symbol))
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
feed = self._feeds.get(symbol)
if feed:
# do something else
# await tractor.breakpoint()
to_trio, from_aio = feed
return from_aio.clone()
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def push(t):
"""Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
self.ib.cancelMktData(contract)
# decouple broadcast mem chan
self._feeds.pop(symbol, None)
ticker.updateEvent.connect(push)
# cache feed for later consumers
self._feeds[symbol] = to_trio, from_aio
return from_aio
async def get_quote( async def get_quote(
self, self,
symbol: str, symbol: str,
@ -626,6 +578,8 @@ async def _aio_get_client(
client_id: Optional[int] = None, client_id: Optional[int] = None,
) -> Client: ) -> Client:
"""Return an ``ib_insync.IB`` instance wrapped in our client API. """Return an ``ib_insync.IB`` instance wrapped in our client API.
Client instances are cached for later use.
""" """
# first check cache for existing client # first check cache for existing client
@ -665,8 +619,10 @@ async def _aio_get_client(
# create and cache # create and cache
try: try:
client = Client(ib) client = Client(ib)
_client_cache[(host, port)] = client _client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}") log.debug(f"Caching client for {(host, port)}")
yield client yield client
except BaseException: except BaseException:
@ -918,31 +874,23 @@ asset_type_map = {
} }
_quote_streams: Dict[str, trio.abc.ReceiveStream] = {} _quote_streams: Dict[str, trio.abc.ReceiveStream] = {}
async def stream_quotes( async def _setup_quote_stream(
client,
symbol: str, symbol: str,
opts: Tuple[int] = ('375', '233', '236'), opts: Tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None, contract: Optional[Contract] = None,
) -> None: ) -> None:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
""" """
global _quote_streams
async with _aio_get_client() as client:
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
chans = _quote_streams.get(symbol)
if feed:
# if we already have a cached feed deliver a rx side clone to
# consumer
to_trio, from_aio = chans
return from_aio.clone()
# define a simple queue push routine that streams quote packets # define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel. # to trio over the ``to_trio`` memory channel.
to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
@ -961,22 +909,43 @@ async def stream_quotes(
# Manually do the dereg ourselves. # Manually do the dereg ourselves.
ticker.updateEvent.disconnect(push) ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`") log.error(f"Disconnected stream for `{symbol}`")
self.ib.cancelMktData(contract) client.ib.cancelMktData(contract)
# decouple broadcast mem chan # decouple broadcast mem chan
self._feeds.pop(symbol, None) _quote_streams.pop(symbol, None)
ticker.updateEvent.connect(push) ticker.updateEvent.connect(push)
# cache feed for later consumers
_quote_streams[symbol] = to_trio, from_aio
return from_aio return from_aio
# TODO: figure out how to share quote feeds sanely despite async def start_aio_quote_stream(
# the wacky ``ib_insync`` api. symbol: str,
# @tractor.msg.pub contract: Optional[Contract] = None,
) -> trio.abc.ReceiveStream:
global _quote_streams
from_aio = _quote_streams.get(symbol)
if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer
return from_aio.clone()
else:
from_aio = await tractor.to_asyncio.run_task(
_setup_quote_stream,
symbol=symbol,
contract=contract,
)
# cache feed for later consumers
_quote_streams[symbol] = from_aio
return from_aio
@tractor.stream @tractor.stream
async def stream_quotes( async def stream_quotes(
ctx: tractor.Context, ctx: tractor.Context,
@ -1004,11 +973,7 @@ async def stream_quotes(
symbol=sym, symbol=sym,
) )
stream = await _trio_run_client_method( stream = await start_aio_quote_stream(symbol=sym, contract=contract)
method='stream_ticker',
contract=contract, # small speedup
symbol=sym,
)
shm = None shm = None
async with trio.open_nursery() as ln: async with trio.open_nursery() as ln:
@ -1059,8 +1024,6 @@ async def stream_quotes(
subscribe_ohlc_for_increment(shm, delay_s) subscribe_ohlc_for_increment(shm, delay_s)
# pass back some symbol info like min_tick, trading_hours, etc. # pass back some symbol info like min_tick, trading_hours, etc.
# con = asdict(contract)
# syminfo = contract
syminfo = asdict(details) syminfo = asdict(details)
syminfo.update(syminfo['contract']) syminfo.update(syminfo['contract'])
@ -1111,8 +1074,6 @@ async def stream_quotes(
# yield first quote asap # yield first quote asap
await ctx.send_yield(first_quote) await ctx.send_yield(first_quote)
# ticker.ticks = []
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []