Use `tractor.to_asyncio.open_channel_from()` in ib backend

vlm_plotz_backup
Tyler Goodlet 2021-10-22 12:06:00 -04:00
parent 980a6dde05
commit af0503956a
1 changed files with 383 additions and 300 deletions

View File

@ -21,7 +21,7 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``. ``infected_aio==True``.
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from dataclasses import asdict from dataclasses import asdict
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
@ -570,66 +570,6 @@ class Client:
) )
) )
async def recv_trade_updates(
self,
to_trio: trio.abc.SendChannel,
) -> None:
"""Stream a ticker using the std L1 api.
"""
self.inline_errors(to_trio)
def push_tradesies(eventkit_obj, obj, fill=None):
"""Push events to trio task.
"""
if fill is not None:
# execution details event
item = ('fill', (obj, fill))
elif eventkit_obj.name() == 'positionEvent':
item = ('position', obj)
else:
item = ('status', obj)
log.info(f'eventkit event ->\n{pformat(item)}')
try:
to_trio.send_nowait(item)
except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies)
# hook up to the weird eventkit object - event stream api
for ev_name in [
'orderStatusEvent', # all order updates
'execDetailsEvent', # all "fill" updates
'positionEvent', # avg price updates per symbol per account
# 'commissionReportEvent',
# XXX: ugh, it is a separate event from IB and it's
# emitted as follows:
# self.ib.commissionReportEvent.emit(trade, fill, report)
# XXX: not sure yet if we need these
# 'updatePortfolioEvent',
# XXX: these all seem to be weird ib_insync intrernal
# events that we probably don't care that much about
# given the internal design is wonky af..
# 'newOrderEvent',
# 'orderModifyEvent',
# 'cancelOrderEvent',
# 'openOrderEvent',
]:
eventkit_obj = getattr(self.ib, ev_name)
handler = partial(push_tradesies, eventkit_obj)
eventkit_obj.connect(handler)
# let the engine run and stream
await self.ib.disconnectedEvent
def inline_errors( def inline_errors(
self, self,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
@ -675,6 +615,71 @@ class Client:
return self.ib.positions(account=account) return self.ib.positions(account=account)
async def recv_trade_updates(
client: Client,
to_trio: trio.abc.SendChannel,
) -> None:
"""Stream a ticker using the std L1 api.
"""
client.inline_errors(to_trio)
# sync with trio task
to_trio.send_nowait(None)
def push_tradesies(eventkit_obj, obj, fill=None):
"""Push events to trio task.
"""
if fill is not None:
# execution details event
item = ('fill', (obj, fill))
elif eventkit_obj.name() == 'positionEvent':
item = ('position', obj)
else:
item = ('status', obj)
log.info(f'eventkit event ->\n{pformat(item)}')
try:
to_trio.send_nowait(item)
except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies)
# hook up to the weird eventkit object - event stream api
for ev_name in [
'orderStatusEvent', # all order updates
'execDetailsEvent', # all "fill" updates
'positionEvent', # avg price updates per symbol per account
# 'commissionReportEvent',
# XXX: ugh, it is a separate event from IB and it's
# emitted as follows:
# self.ib.commissionReportEvent.emit(trade, fill, report)
# XXX: not sure yet if we need these
# 'updatePortfolioEvent',
# XXX: these all seem to be weird ib_insync intrernal
# events that we probably don't care that much about
# given the internal design is wonky af..
# 'newOrderEvent',
# 'orderModifyEvent',
# 'cancelOrderEvent',
# 'openOrderEvent',
]:
eventkit_obj = getattr(client.ib, ev_name)
handler = partial(push_tradesies, eventkit_obj)
eventkit_obj.connect(handler)
# let the engine run and stream
await client.ib.disconnectedEvent
# default config ports # default config ports
_tws_port: int = 7497 _tws_port: int = 7497
_gw_port: int = 4002 _gw_port: int = 4002
@ -705,7 +710,7 @@ def get_config() -> dict[str, Any]:
_accounts2clients: dict[str, Client] = {} _accounts2clients: dict[str, Client] = {}
@asynccontextmanager @acm
async def load_aio_clients( async def load_aio_clients(
host: str = '127.0.0.1', host: str = '127.0.0.1',
@ -756,12 +761,12 @@ async def load_aio_clients(
accounts_def = config.load_accounts(['ib']) accounts_def = config.load_accounts(['ib'])
try_ports = list(ports.values()) try_ports = list(ports.values())
ports = try_ports if port is None else [port] ports = try_ports if port is None else [port]
we_connected = [] # we_connected = []
connect_timeout = 0.5 if platform.system() != 'Windows' else 1 connect_timeout = 0.5 if platform.system() != 'Windows' else 1
combos = list(itertools.product(hosts, ports)) combos = list(itertools.product(hosts, ports))
# allocate new and/or reload disconnected but cached clients # allocate new and/or reload disconnected but cached clients
try: # try:
# TODO: support multiple clients allowing for execution on # TODO: support multiple clients allowing for execution on
# multiple accounts (including a paper instance running on the # multiple accounts (including a paper instance running on the
# same machine) and switching between accounts in the EMs # same machine) and switching between accounts in the EMs
@ -785,9 +790,9 @@ async def load_aio_clients(
try: try:
ib = NonShittyIB() ib = NonShittyIB()
# if this is a persistent brokerd, try to allocate # XXX: not sure if we ever really need to increment the
# a new id for each client # client id if teardown is sucessful.
client_id = next(_client_ids) client_id = 616
await ib.connectAsync( await ib.connectAsync(
host, host,
@ -831,7 +836,20 @@ async def load_aio_clients(
# update all actor-global caches # update all actor-global caches
log.info(f"Caching client for {(host, port)}") log.info(f"Caching client for {(host, port)}")
_client_cache[(host, port)] = client _client_cache[(host, port)] = client
we_connected.append(client)
# we_connected.append((host, port, client))
# TODO: don't do it this way, get a gud to_asyncio
# context / .start() system goin..
def pop_and_discon():
log.info(f'Disconnecting client {client}')
client.ib.disconnect()
_client_cache.pop((host, port), None)
# NOTE: the above callback **CAN'T FAIL** or shm won't get
# torn down correctly ...
tractor._actor._lifetime_stack.callback(pop_and_discon)
_accounts2clients.update(accounts_found) _accounts2clients.update(accounts_found)
except ( except (
@ -865,10 +883,14 @@ async def load_aio_clients(
yield client, _client_cache, _accounts2clients yield client, _client_cache, _accounts2clients
except BaseException: # TODO: this in a way that works xD
for client in we_connected: # finally:
client.ib.disconnect() # pass
raise # # async with trio.CancelScope(shield=True):
# for host, port, client in we_connected:
# client.ib.disconnect()
# _client_cache.pop((host, port))
# raise
async def _aio_run_client_method( async def _aio_run_client_method(
@ -909,16 +931,16 @@ async def _trio_run_client_method(
assert ca.is_infected_aio() assert ca.is_infected_aio()
# if the method is an *async gen* stream for it # if the method is an *async gen* stream for it
meth = getattr(Client, method) # meth = getattr(Client, method)
args = tuple(inspect.getfullargspec(meth).args) # args = tuple(inspect.getfullargspec(meth).args)
if inspect.isasyncgenfunction(meth) or ( # if inspect.isasyncgenfunction(meth) or (
# if the method is an *async func* but manually # # if the method is an *async func* but manually
# streams back results, make sure to also stream it # # streams back results, make sure to also stream it
'to_trio' in args # 'to_trio' in args
): # ):
kwargs['_treat_as_stream'] = True # kwargs['_treat_as_stream'] = True
return await tractor.to_asyncio.run_task( return await tractor.to_asyncio.run_task(
_aio_run_client_method, _aio_run_client_method,
@ -968,7 +990,7 @@ def get_client_proxy(
return proxy return proxy
@asynccontextmanager @acm
async def get_client( async def get_client(
**kwargs, **kwargs,
) -> Client: ) -> Client:
@ -1037,8 +1059,10 @@ def normalize(
async def get_bars( async def get_bars(
sym: str, sym: str,
end_dt: str = "", end_dt: str = "",
) -> (dict, np.ndarray): ) -> (dict, np.ndarray):
_err: Optional[Exception] = None _err: Optional[Exception] = None
@ -1066,10 +1090,20 @@ async def get_bars(
# TODO: retreive underlying ``ib_insync`` error? # TODO: retreive underlying ``ib_insync`` error?
if err.code == 162: if err.code == 162:
# TODO: so this error is normally raised (it seems) if
# we try to retrieve history for a time range for which
# there is none. in that case we should not only report
# the "empty range" but also do a iteration on the time
# step for ``next_dt`` to see if we can pull older
# history.
if 'HMDS query returned no data' in err.message: if 'HMDS query returned no data' in err.message:
# means we hit some kind of historical "dead zone" # means we hit some kind of historical "empty space"
# and further requests seem to always cause # and further requests will need to decrement the
# throttling despite the rps being low # start time dt in order to not receive a further
# error?
# OLDER: seem to always cause throttling despite low rps
# raise err
break break
elif 'No market data permissions for' in err.message: elif 'No market data permissions for' in err.message:
@ -1092,8 +1126,7 @@ async def get_bars(
fails += 1 fails += 1
continue continue
return (None, None) return None, None
# else: # throttle wasn't fixed so error out immediately # else: # throttle wasn't fixed so error out immediately
# raise _err # raise _err
@ -1108,7 +1141,7 @@ async def backfill_bars(
# on that until we have the `marketstore` daemon in place in which # on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys # case the shm size will be driven by user config and available sys
# memory. # memory.
count: int = 24, count: int = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -1119,7 +1152,13 @@ async def backfill_bars(
https://github.com/pikers/piker/issues/128 https://github.com/pikers/piker/issues/128
""" """
(first_bars, bars_array, next_dt), fails = await get_bars(sym) out, fails = await get_bars(sym)
if out is None:
raise RuntimeError("Could not pull currrent history?!")
(first_bars, bars_array, next_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# write historical data to buffer # write historical data to buffer
shm.push(bars_array) shm.push(bars_array)
@ -1182,14 +1221,21 @@ _quote_streams: dict[str, trio.abc.ReceiveStream] = {}
async def _setup_quote_stream( async def _setup_quote_stream(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
symbol: str, symbol: str,
opts: tuple[int] = ('375', '233', '236'), opts: tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None, contract: Optional[Contract] = None,
) -> None:
) -> trio.abc.ReceiveChannel:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
""" """
global _quote_streams global _quote_streams
to_trio.send_nowait(None)
async with load_aio_clients() as ( async with load_aio_clients() as (
client, client,
clients, clients,
@ -1198,23 +1244,10 @@ async def _setup_quote_stream(
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets # # define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel. # # to trio over the ``to_trio`` memory channel.
to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
def push(t):
"""Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
ticker.updateEvent.disconnect(push) ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`") log.error(f"Disconnected stream for `{symbol}`")
client.ib.cancelMktData(contract) client.ib.cancelMktData(contract)
@ -1222,41 +1255,73 @@ async def _setup_quote_stream(
# decouple broadcast mem chan # decouple broadcast mem chan
_quote_streams.pop(symbol, None) _quote_streams.pop(symbol, None)
def push(t: Ticker) -> None:
"""
Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except (
trio.BrokenResourceError,
# XXX: HACK, not sure why this gets left stale (probably
# due to our terrible ``tractor.to_asyncio``
# implementation for streams.. but if the mem chan
# gets left here and starts blocking just kill the feed?
# trio.WouldBlock,
):
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
teardown()
# except trio.WouldBlock: # except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt # # for slow debugging purposes to avoid clobbering prompt
# # with log msgs # # with log msgs
# pass # pass
ticker.updateEvent.connect(push) ticker.updateEvent.connect(push)
try:
await asyncio.sleep(float('inf'))
finally:
teardown()
return from_aio # return from_aio
async def start_aio_quote_stream( @acm
async def open_aio_quote_stream(
symbol: str, symbol: str,
contract: Optional[Contract] = None, contract: Optional[Contract] = None,
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
from tractor.trionics import broadcast_receiver
global _quote_streams global _quote_streams
from_aio = _quote_streams.get(symbol) from_aio = _quote_streams.get(symbol)
if from_aio: if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer # if we already have a cached feed deliver a rx side clone to consumer
return from_aio.clone() async with broadcast_receiver(from_aio) as from_aio:
yield from_aio
return
else: async with tractor.to_asyncio.open_channel_from(
from_aio = await tractor.to_asyncio.run_task(
_setup_quote_stream, _setup_quote_stream,
symbol=symbol, symbol=symbol,
contract=contract, contract=contract,
) ) as (first, from_aio):
# cache feed for later consumers # cache feed for later consumers
_quote_streams[symbol] = from_aio _quote_streams[symbol] = from_aio
return from_aio yield from_aio
async def stream_quotes( async def stream_quotes(
@ -1285,7 +1350,10 @@ async def stream_quotes(
symbol=sym, symbol=sym,
) )
stream = await start_aio_quote_stream(symbol=sym, contract=contract) # stream = await start_aio_quote_stream(symbol=sym, contract=contract)
async with open_aio_quote_stream(
symbol=sym, contract=contract
) as stream:
# pass back some symbol info like min_tick, trading_hours, etc. # pass back some symbol info like min_tick, trading_hours, etc.
syminfo = asdict(details) syminfo = asdict(details)
@ -1350,6 +1418,7 @@ async def stream_quotes(
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
async with aclosing(stream):
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
# suffix = 'exchange' # suffix = 'exchange'
# calc_price = False # should be real volume for contract # calc_price = False # should be real volume for contract
@ -1362,7 +1431,8 @@ async def stream_quotes(
# for a real volume contract we rait for the first # for a real volume contract we rait for the first
# "real" trade to take place # "real" trade to take place
if not calc_price and not ticker.rtTime: if not calc_price and not ticker.rtTime:
# spin consuming tickers until we get a real market datum # spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}") log.debug(f"New unsent ticker: {ticker}")
continue continue
else: else:
@ -1379,7 +1449,6 @@ async def stream_quotes(
feed_is_live.set() feed_is_live.set()
# last = time.time() # last = time.time()
async with aclosing(stream):
async for ticker in stream: async for ticker in stream:
# print(f'ticker rate: {1/(time.time() - last)}') # print(f'ticker rate: {1/(time.time() - last)}')
@ -1536,13 +1605,24 @@ async def trades_dialogue(
accounts = set() accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
async with trio.open_nursery() as nurse:
for account, client in _accounts2clients.items(): for account, client in _accounts2clients.items():
# each client to an api endpoint will have it's own event stream async def open_stream(
trade_event_stream = await _trio_run_client_method( task_status: TaskStatus[
method='recv_trade_updates', trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client, client=client,
) ) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
trade_event_stream = await nurse.start(open_stream)
clients.append((client, trade_event_stream)) clients.append((client, trade_event_stream))
for client in _client_cache.values(): for client in _client_cache.values():
@ -1552,7 +1632,10 @@ async def trades_dialogue(
accounts.add(msg.account) accounts.add(msg.account)
all_positions.append(msg.dict()) all_positions.append(msg.dict())
await ctx.started((all_positions, accounts)) await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,