Drop task-per-method `trio`-`asyncio` proxying

Use method proxies through the remaining endpoints and drop the old
spawn-a-task-per-method-call style helpers from module.
pre_flow
Tyler Goodlet 2022-05-15 13:32:39 -04:00
parent 58da3ceecf
commit 4d4c7825e5
1 changed files with 185 additions and 230 deletions

View File

@ -1143,6 +1143,13 @@ def get_preferred_data_client(
clients: dict[str, Client], clients: dict[str, Client],
) -> tuple[str, Client]: ) -> tuple[str, Client]:
'''
Load and return the (first found) `Client` instance that is
preferred and should be used for data by iterating, in priority
order, the ``ib.prefer_data_account: list[str]`` account names in
the users ``brokers.toml`` file.
'''
conf = get_config() conf = get_config()
data_accounts = conf['prefer_data_account'] data_accounts = conf['prefer_data_account']
@ -1165,7 +1172,6 @@ async def open_data_client() -> MethodProxy:
and deliver that client wrapped in a ``MethodProxy``. and deliver that client wrapped in a ``MethodProxy``.
''' '''
async with ( async with (
open_client_proxies() as (proxies, clients), open_client_proxies() as (proxies, clients),
): ):
@ -1179,60 +1185,6 @@ async def open_data_client() -> MethodProxy:
yield proxy yield proxy
async def _aio_run_client_method(
meth: str,
to_trio=None,
from_trio=None,
client=None,
**kwargs,
) -> None:
async with load_aio_clients() as accts2clients:
client = list(accts2clients.values())[0]
async_meth = getattr(client, meth)
# handle streaming methods
args = tuple(inspect.getfullargspec(async_meth).args)
if to_trio and 'to_trio' in args:
kwargs['to_trio'] = to_trio
log.runtime(f'Running {meth}({kwargs})')
return await async_meth(**kwargs)
async def _trio_run_client_method(
method: str,
client: Optional[Client] = None,
**kwargs,
) -> None:
'''
Asyncio entry point to run tasks against the ``ib_insync`` api.
'''
ca = tractor.current_actor()
assert ca.is_infected_aio()
# if the method is an *async gen* stream for it
# meth = getattr(Client, method)
# args = tuple(inspect.getfullargspec(meth).args)
# if inspect.isasyncgenfunction(meth) or (
# # if the method is an *async func* but manually
# # streams back results, make sure to also stream it
# 'to_trio' in args
# ):
# kwargs['_treat_as_stream'] = True
return await to_asyncio.run_task(
_aio_run_client_method,
meth=method,
client=client,
**kwargs
)
class MethodProxy: class MethodProxy:
def __init__( def __init__(
@ -1830,6 +1782,9 @@ async def _setup_quote_stream(
''' '''
Stream a ticker using the std L1 api. Stream a ticker using the std L1 api.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
''' '''
global _quote_streams global _quote_streams
@ -1926,6 +1881,7 @@ async def open_aio_quote_stream(
_setup_quote_stream, _setup_quote_stream,
symbol=symbol, symbol=symbol,
contract=contract, contract=contract,
) as (first, from_aio): ) as (first, from_aio):
# cache feed for later consumers # cache feed for later consumers
@ -1956,122 +1912,120 @@ async def stream_quotes(
sym = symbols[0] sym = symbols[0]
log.info(f'request for real-time quotes: {sym}') log.info(f'request for real-time quotes: {sym}')
con, first_ticker, details = await _trio_run_client_method( async with open_data_client() as proxy:
method='get_sym_details',
symbol=sym,
)
first_quote = normalize(first_ticker)
# print(f'first quote: {first_quote}')
def mk_init_msgs() -> dict[str, dict]: con, first_ticker, details = await proxy.get_sym_details(symbol=sym)
# pass back some symbol info like min_tick, trading_hours, etc. first_quote = normalize(first_ticker)
syminfo = asdict(details) # print(f'first quote: {first_quote}')
syminfo.update(syminfo['contract'])
# nested dataclass we probably don't need and that won't IPC serialize def mk_init_msgs() -> dict[str, dict]:
syminfo.pop('secIdList') # pass back some symbol info like min_tick, trading_hours, etc.
syminfo = asdict(details)
syminfo.update(syminfo['contract'])
# TODO: more consistent field translation # nested dataclass we probably don't need and that won't IPC
atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] # serialize
syminfo.pop('secIdList')
# for stocks it seems TWS reports too small a tick size # TODO: more consistent field translation
# such that you can't submit orders with that granularity? atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']]
min_tick = 0.01 if atype == 'stock' else 0
syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) # for stocks it seems TWS reports too small a tick size
# such that you can't submit orders with that granularity?
min_tick = 0.01 if atype == 'stock' else 0
# for "traditional" assets, volume is normally discreet, not a float syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick)
syminfo['lot_tick_size'] = 0.0
# TODO: for loop through all symbols passed in # for "traditional" assets, volume is normally discreet, not
init_msgs = { # a float
# pass back token, and bool, signalling if we're the writer syminfo['lot_tick_size'] = 0.0
# and that history has been written
sym: { # TODO: for loop through all symbols passed in
'symbol_info': syminfo, init_msgs = {
'fqsn': first_quote['fqsn'], # pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': syminfo,
'fqsn': first_quote['fqsn'],
}
} }
} return init_msgs
return init_msgs
init_msgs = mk_init_msgs() init_msgs = mk_init_msgs()
# TODO: we should instead spawn a task that waits on a feed to start # TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff. # and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(1): with trio.move_on_after(1):
contract, first_ticker, details = await _trio_run_client_method( contract, first_ticker, details = await proxy.get_quote(symbol=sym)
method='get_quote',
symbol=sym,
)
# it might be outside regular trading hours so see if we can at # it might be outside regular trading hours so see if we can at
# least grab history. # least grab history.
if isnan(first_ticker.last): if isnan(first_ticker.last):
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
# it's not really live but this will unblock # it's not really live but this will unblock
# the brokerd feed task to tell the ui to update? # the brokerd feed task to tell the ui to update?
feed_is_live.set()
# block and let data history backfill code run.
await trio.sleep_forever()
return # we never expect feed to come up?
async with open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream:
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
task_status.started((init_msgs, first_quote))
async with aclosing(stream):
if type(first_ticker.contract) not in (
ibis.Commodity,
ibis.Forex
):
# wait for real volume on feed (trading might be closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live
feed_is_live.set() feed_is_live.set()
# last = time.time() # block and let data history backfill code run.
async for ticker in stream: await trio.sleep_forever()
quote = normalize(ticker) return # we never expect feed to come up?
await send_chan.send({quote['fqsn']: quote})
async with open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream:
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
task_status.started((init_msgs, first_quote))
async with aclosing(stream):
if type(first_ticker.contract) not in (
ibis.Commodity,
ibis.Forex
):
# wait for real volume on feed (trading might be closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live
feed_is_live.set()
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# last = time.time() # last = time.time()
async for ticker in stream:
quote = normalize(ticker)
await send_chan.send({quote['fqsn']: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# last = time.time()
def pack_position( def pack_position(
@ -2466,95 +2420,96 @@ async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> None: ) -> None:
# load all symbols locally for fast search
# TODO: load user defined symbol set locally for fast search?
await ctx.started({}) await ctx.started({})
async with ctx.open_stream() as stream: async with open_data_client() as proxy:
async with ctx.open_stream() as stream:
last = time.time()
async for pattern in stream:
log.debug(f'received {pattern}')
now = time.time()
assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz
diff = now - last
if diff < 1.0:
log.debug('throttle sleeping')
await trio.sleep(diff)
try:
pattern = stream.receive_nowait()
except trio.WouldBlock:
pass
if not pattern or pattern.isspace():
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client
# side will cache a null set result and not showing
# anything to the use on re-searches when this query
# timed out. We probably need a special "timeout" msg
# or something...
# XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block
await stream.send({})
continue
log.debug(f'searching for {pattern}')
last = time.time() last = time.time()
# async batch search using api stocks endpoint and module async for pattern in stream:
# defined adhoc symbol set. log.debug(f'received {pattern}')
stock_results = [] now = time.time()
async def stash_results(target: Awaitable[list]): assert pattern, 'IB can not accept blank search pattern'
stock_results.extend(await target)
async with trio.open_nursery() as sn: # throttle search requests to no faster then 1Hz
sn.start_soon( diff = now - last
stash_results, if diff < 1.0:
_trio_run_client_method( log.debug('throttle sleeping')
method='search_symbols', await trio.sleep(diff)
pattern=pattern, try:
upto=5, pattern = stream.receive_nowait()
except trio.WouldBlock:
pass
if not pattern or pattern.isspace():
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client
# side will cache a null set result and not showing
# anything to the use on re-searches when this query
# timed out. We probably need a special "timeout" msg
# or something...
# XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block
await stream.send({})
continue
log.debug(f'searching for {pattern}')
last = time.time()
# async batch search using api stocks endpoint and module
# defined adhoc symbol set.
stock_results = []
async def stash_results(target: Awaitable[list]):
stock_results.extend(await target)
async with trio.open_nursery() as sn:
sn.start_soon(
stash_results,
proxy.search_symbols(
pattern=pattern,
upto=5,
),
) )
)
# trigger async request # trigger async request
await trio.sleep(0) await trio.sleep(0)
# match against our ad-hoc set immediately # match against our ad-hoc set immediately
adhoc_matches = fuzzy.extractBests( adhoc_matches = fuzzy.extractBests(
pattern,
list(_adhoc_futes_set),
score_cutoff=90,
)
log.info(f'fuzzy matched adhocs: {adhoc_matches}')
adhoc_match_results = {}
if adhoc_matches:
# TODO: do we need to pull contract details?
adhoc_match_results = {i[0]: {} for i in adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}')
stock_matches = fuzzy.extractBests(
pattern, pattern,
list(_adhoc_futes_set), stock_results,
score_cutoff=90, score_cutoff=50,
) )
log.info(f'fuzzy matched adhocs: {adhoc_matches}')
adhoc_match_results = {}
if adhoc_matches:
# TODO: do we need to pull contract details?
adhoc_match_results = {i[0]: {} for i in adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}') matches = adhoc_match_results | {
stock_matches = fuzzy.extractBests( item[0]: {} for item in stock_matches
pattern, }
stock_results, # TODO: we used to deliver contract details
score_cutoff=50, # {item[2]: item[0] for item in stock_matches}
)
matches = adhoc_match_results | { log.debug(f"sending matches: {matches.keys()}")
item[0]: {} for item in stock_matches await stream.send(matches)
}
# TODO: we used to deliver contract details
# {item[2]: item[0] for item in stock_matches}
log.debug(f"sending matches: {matches.keys()}")
await stream.send(matches)
async def data_reset_hack( async def data_reset_hack(