Add context-styled `asyncio` client proxy for ib

This adds a new client manager-factory: `open_client_proxy()` which uses
the newer `tractor.to_asyncio.open_channel_from()` (and thus the
inter-loop-task-channel style) a `aio_client_method_relay()` and
a re-implemented `MethodProxy` wrapper to allow transparently calling
`asyncio` client methods from `trio` tasks. Use this proxy in the
history backfiller task and add a new (prototype)
`open_history_client()` which will be used in the new storage management
layer. Drop `get_client()` which was the portal wrapping equivalent of
the same proxy but with a one-task-per-call approach. Oh, and
`Client.bars()` can take `datetime`, so let's use it B)
mkts_backup
Tyler Goodlet 2022-03-08 07:02:37 -05:00
parent cf589c840d
commit 820dfff08a
1 changed files with 204 additions and 88 deletions

View File

@ -29,8 +29,9 @@ from functools import partial
import itertools
from math import isnan
from typing import (
Any, Optional,
Any, Callable, Optional,
AsyncIterator, Awaitable,
Union,
)
import asyncio
from pprint import pformat
@ -43,6 +44,7 @@ import time
import trio
from trio_typing import TaskStatus
import tractor
from tractor import to_asyncio
from async_generator import aclosing
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails, Option
@ -58,7 +60,7 @@ import numpy as np
from .. import config
from ..log import get_logger, get_console_log
from .._daemon import maybe_spawn_brokerd
# from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df
from ..data._sharedmem import ShmArray
from ._util import SymbolNotFound, NoData
@ -217,11 +219,12 @@ _enters = 0
class Client:
"""IB wrapped for our broker backend API.
'''
IB wrapped for our broker backend API.
Note: this client requires running inside an ``asyncio`` loop.
"""
'''
_contracts: dict[str, Contract] = {}
def __init__(
@ -242,20 +245,22 @@ class Client:
self,
symbol: str,
# EST in ISO 8601 format is required... below is EPOCH
start_dt: str = "1970-01-01T00:00:00.000000-05:00",
end_dt: str = "",
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
end_dt: Union[datetime, str ] = "",
sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder
) -> list[dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present.
"""
'''
Retreive OHLCV bars for a symbol over a range to the present.
'''
bars_kwargs = {'whatToShow': 'TRADES'}
global _enters
print(f'ENTER BARS {_enters}')
print(f'ENTER BARS {_enters} @ end={end_dt}')
_enters += 1
contract = await self.find_contract(symbol)
@ -984,7 +989,7 @@ async def _trio_run_client_method(
# ):
# kwargs['_treat_as_stream'] = True
return await tractor.to_asyncio.run_task(
return await to_asyncio.run_task(
_aio_run_client_method,
meth=method,
client=client,
@ -992,60 +997,119 @@ async def _trio_run_client_method(
)
class _MethodProxy:
class MethodProxy:
def __init__(
self,
portal: tractor.Portal
chan: to_asyncio.LinkedTaskChannel,
) -> None:
self._portal = portal
self.chan = chan
async def _run_method(
self,
*,
meth: str = None,
**kwargs
) -> Any:
return await self._portal.run(
_trio_run_client_method,
method=meth,
**kwargs
)
'''
Make a ``Client`` method call by requesting through the
``tractor.to_asyncio`` layer.
'''
chan = self.chan
# send through method + ``kwargs: dict`` as pair
await chan.send((meth, kwargs))
return await chan.receive()
def get_client_proxy(
async def open_aio_client_method_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
portal: tractor.Portal,
target=Client,
) -> None:
) -> _MethodProxy:
proxy = _MethodProxy(portal)
# mock all remote methods
for name, method in inspect.getmembers(
target, predicate=inspect.isfunction
async with load_aio_clients() as (
client,
clients,
accts2clients,
):
if '_' == name[0]:
continue
setattr(proxy, name, partial(proxy._run_method, meth=name))
to_trio.send_nowait(client)
return proxy
# relay all method requests to ``asyncio``-side client and
# deliver back results
while True:
msg = await from_trio.get()
meth_name, kwargs = msg
meth = getattr(client, meth_name)
resp = await meth(**kwargs)
# echo the msg back
to_trio.send_nowait(resp)
@acm
async def get_client(
**kwargs,
) -> Client:
"""Init the ``ib_insync`` client in another actor and return
a method proxy to it.
"""
async with maybe_spawn_brokerd(
brokername='ib',
infect_asyncio=True,
**kwargs
) as portal:
proxy_client = get_client_proxy(portal)
yield proxy_client
async def open_client_proxy() -> MethodProxy:
try:
async with to_asyncio.open_channel_from(
open_aio_client_method_relay,
) as (first, chan):
assert isinstance(first, Client)
proxy = MethodProxy(chan)
# mock all remote methods on ib ``Client``.
for name, method in inspect.getmembers(
Client, predicate=inspect.isfunction
):
if '_' == name[0]:
continue
setattr(proxy, name, partial(proxy._run_method, meth=name))
yield proxy
except RequestError as err:
code, msg = err.code, err.message
# TODO: retreive underlying ``ib_insync`` error?
if (
code == 162 and (
'HMDS query returned no data' in msg
or 'No market data permissions for' in msg
)
):
# these cases should not cause a task crash
log.warning(msg)
else:
raise
# @acm
# async def get_client(
# **kwargs,
# ) -> Client:
# '''
# Init the ``ib_insync`` client in another actor and return
# a method proxy to it.
# '''
# async with (
# maybe_spawn_brokerd(
# brokername='ib',
# infect_asyncio=True,
# **kwargs
# ) as portal,
# ):
# assert 0
# TODO: the IPC via portal relay layer for when this current
# actor isn't in aio mode.
# open_client_proxy() as proxy,
# yield proxy
# https://interactivebrokers.github.io/tws-api/tick_types.html
@ -1126,27 +1190,32 @@ def normalize(
async def get_bars(
proxy: MethodProxy,
sym: str,
end_dt: str = "",
) -> (dict, np.ndarray):
'''
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
'''
_err: Optional[Exception] = None
fails = 0
bars: Optional[list] = None
for _ in range(2):
try:
bars, bars_array = await _trio_run_client_method(
method='bars',
bars, bars_array = await proxy.bars(
symbol=sym,
end_dt=end_dt,
)
if bars_array is None:
raise SymbolNotFound(sym)
next_dt = bars[0].date
print(f'ib datetime {next_dt}')
return (bars, bars_array, next_dt), fails
@ -1169,8 +1238,16 @@ async def get_bars(
# error?
# OLDER: seem to always cause throttling despite low rps
# raise err
break
# TODO: if there is not bars returned from the first
# query we need to manually calculate the next step
# back and convert to an expected datetime format.
# if not bars:
# raise
# try to decrement start point and look further back
next_dt = bars[0].date
print(f'ib datetime {next_dt}')
continue
elif 'No market data permissions for' in err.message:
@ -1197,6 +1274,41 @@ async def get_bars(
# raise _err
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
async with open_client_proxy() as proxy:
async def get_hist(
end_dt: str,
start_dt: str = '',
) -> tuple[np.ndarray, str]:
out, fails = await get_bars(proxy, symbol, end_dt=end_dt)
# TODO: add logic here to handle tradable hours and only grab
# valid bars in the range
if out == (None, None):
# could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(f'{end_dt}')
bars, bars_array, next_dt = out
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
return bars_array, next_dt
yield get_hist
async def backfill_bars(
sym: str,
@ -1219,56 +1331,60 @@ async def backfill_bars(
https://github.com/pikers/piker/issues/128
'''
if platform.system() == 'Windows':
log.warning(
'Decreasing history query count to 4 since, windows...')
count = 4
# async with open_history_client(sym) as proxy:
async with open_client_proxy() as proxy:
out, fails = await get_bars(sym)
if platform.system() == 'Windows':
log.warning(
'Decreasing history query count to 4 since, windows...')
count = 4
if out is None:
raise RuntimeError("Could not pull currrent history?!")
out, fails = await get_bars(proxy, sym)
(first_bars, bars_array, next_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
if out is None:
raise RuntimeError("Could not pull currrent history?!")
# write historical data to buffer
shm.push(bars_array)
(first_bars, bars_array, next_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
with trio.CancelScope() as cs:
# write historical data to buffer
shm.push(bars_array)
task_status.started(cs)
with trio.CancelScope() as cs:
i = 0
while i < count:
task_status.started(cs)
out, fails = await get_bars(sym, end_dt=next_dt)
i = 0
while i < count:
if fails is None or fails > 1:
break
out, fails = await get_bars(proxy, sym, end_dt=next_dt)
if out == (None, None):
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and only grab
# valid bars in the range
log.error(f"Can't grab bars starting at {next_dt}!?!?")
continue
if fails is None or fails > 1:
break
bars, bars_array, next_dt = out
if out == (None, None):
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and
# only grab valid bars in the range
log.error(f"Can't grab bars starting at {next_dt}!?!?")
continue
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# TODO we should probably dig into forums to see what peeps
# think this data "means" and then use it as an indicator of
# sorts? dinkus has mentioned that $vlms for the day dont'
# match other platforms nor the summary stat tws shows in
# the monitor - it's probably worth investigating.
bars, bars_array, next_dt = out
shm.push(bars_array, prepend=True)
i += 1
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# TODO we should probably dig into forums to see what peeps
# think this data "means" and then use it as an indicator of
# sorts? dinkus has mentioned that $vlms for the day dont'
# match other platforms nor the summary stat tws shows in
# the monitor - it's probably worth investigating.
shm.push(bars_array, prepend=True)
i += 1
asset_type_map = {