Drop task-per-method `trio`-`asyncio` proxying

Use method proxies through the remaining endpoints and drop the old
spawn-a-task-per-method-call style helpers from module.
ib_dedicated_data_client
Tyler Goodlet 2022-05-15 13:32:39 -04:00
parent a96f1dec3a
commit 6f172479eb
1 changed files with 185 additions and 230 deletions

View File

@ -1143,6 +1143,13 @@ def get_preferred_data_client(
clients: dict[str, Client], clients: dict[str, Client],
) -> tuple[str, Client]: ) -> tuple[str, Client]:
'''
Load and return the (first found) `Client` instance that is
preferred and should be used for data by iterating, in priority
order, the ``ib.prefer_data_account: list[str]`` account names in
the users ``brokers.toml`` file.
'''
conf = get_config() conf = get_config()
data_accounts = conf['prefer_data_account'] data_accounts = conf['prefer_data_account']
@ -1165,7 +1172,6 @@ async def open_data_client() -> MethodProxy:
and deliver that client wrapped in a ``MethodProxy``. and deliver that client wrapped in a ``MethodProxy``.
''' '''
async with ( async with (
open_client_proxies() as (proxies, clients), open_client_proxies() as (proxies, clients),
): ):
@ -1179,60 +1185,6 @@ async def open_data_client() -> MethodProxy:
yield proxy yield proxy
async def _aio_run_client_method(
meth: str,
to_trio=None,
from_trio=None,
client=None,
**kwargs,
) -> None:
async with load_aio_clients() as accts2clients:
client = list(accts2clients.values())[0]
async_meth = getattr(client, meth)
# handle streaming methods
args = tuple(inspect.getfullargspec(async_meth).args)
if to_trio and 'to_trio' in args:
kwargs['to_trio'] = to_trio
log.runtime(f'Running {meth}({kwargs})')
return await async_meth(**kwargs)
async def _trio_run_client_method(
method: str,
client: Optional[Client] = None,
**kwargs,
) -> None:
'''
Asyncio entry point to run tasks against the ``ib_insync`` api.
'''
ca = tractor.current_actor()
assert ca.is_infected_aio()
# if the method is an *async gen* stream for it
# meth = getattr(Client, method)
# args = tuple(inspect.getfullargspec(meth).args)
# if inspect.isasyncgenfunction(meth) or (
# # if the method is an *async func* but manually
# # streams back results, make sure to also stream it
# 'to_trio' in args
# ):
# kwargs['_treat_as_stream'] = True
return await to_asyncio.run_task(
_aio_run_client_method,
meth=method,
client=client,
**kwargs
)
class MethodProxy: class MethodProxy:
def __init__( def __init__(
@ -1830,6 +1782,9 @@ async def _setup_quote_stream(
''' '''
Stream a ticker using the std L1 api. Stream a ticker using the std L1 api.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
''' '''
global _quote_streams global _quote_streams
@ -1926,6 +1881,7 @@ async def open_aio_quote_stream(
_setup_quote_stream, _setup_quote_stream,
symbol=symbol, symbol=symbol,
contract=contract, contract=contract,
) as (first, from_aio): ) as (first, from_aio):
# cache feed for later consumers # cache feed for later consumers
@ -1956,10 +1912,9 @@ async def stream_quotes(
sym = symbols[0] sym = symbols[0]
log.info(f'request for real-time quotes: {sym}') log.info(f'request for real-time quotes: {sym}')
con, first_ticker, details = await _trio_run_client_method( async with open_data_client() as proxy:
method='get_sym_details',
symbol=sym, con, first_ticker, details = await proxy.get_sym_details(symbol=sym)
)
first_quote = normalize(first_ticker) first_quote = normalize(first_ticker)
# print(f'first quote: {first_quote}') # print(f'first quote: {first_quote}')
@ -1968,7 +1923,8 @@ async def stream_quotes(
syminfo = asdict(details) syminfo = asdict(details)
syminfo.update(syminfo['contract']) syminfo.update(syminfo['contract'])
# nested dataclass we probably don't need and that won't IPC serialize # nested dataclass we probably don't need and that won't IPC
# serialize
syminfo.pop('secIdList') syminfo.pop('secIdList')
# TODO: more consistent field translation # TODO: more consistent field translation
@ -1980,7 +1936,8 @@ async def stream_quotes(
syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick)
# for "traditional" assets, volume is normally discreet, not a float # for "traditional" assets, volume is normally discreet, not
# a float
syminfo['lot_tick_size'] = 0.0 syminfo['lot_tick_size'] = 0.0
# TODO: for loop through all symbols passed in # TODO: for loop through all symbols passed in
@ -1999,10 +1956,7 @@ async def stream_quotes(
# TODO: we should instead spawn a task that waits on a feed to start # TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff. # and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(1): with trio.move_on_after(1):
contract, first_ticker, details = await _trio_run_client_method( contract, first_ticker, details = await proxy.get_quote(symbol=sym)
method='get_quote',
symbol=sym,
)
# it might be outside regular trading hours so see if we can at # it might be outside regular trading hours so see if we can at
# least grab history. # least grab history.
@ -2466,9 +2420,11 @@ async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> None: ) -> None:
# load all symbols locally for fast search
# TODO: load user defined symbol set locally for fast search?
await ctx.started({}) await ctx.started({})
async with open_data_client() as proxy:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
last = time.time() last = time.time()
@ -2518,11 +2474,10 @@ async def open_symbol_search(
async with trio.open_nursery() as sn: async with trio.open_nursery() as sn:
sn.start_soon( sn.start_soon(
stash_results, stash_results,
_trio_run_client_method( proxy.search_symbols(
method='search_symbols',
pattern=pattern, pattern=pattern,
upto=5, upto=5,
) ),
) )
# trigger async request # trigger async request