Add context-styled `asyncio` client proxy for ib
This adds a new client manager-factory: `open_client_proxy()` which uses the newer `tractor.to_asyncio.open_channel_from()` (and thus the inter-loop-task-channel style) a `aio_client_method_relay()` and a re-implemented `MethodProxy` wrapper to allow transparently calling `asyncio` client methods from `trio` tasks. Use this proxy in the history backfiller task and add a new (prototype) `open_history_client()` which will be used in the new storage management layer. Drop `get_client()` which was the portal wrapping equivalent of the same proxy but with a one-task-per-call approach. Oh, and `Client.bars()` can take `datetime`, so let's use it B)broker_bumpz
parent
7936dcafbf
commit
b26b66cc66
|
@ -29,8 +29,9 @@ from functools import partial
|
|||
import itertools
|
||||
from math import isnan
|
||||
from typing import (
|
||||
Any, Optional,
|
||||
Any, Callable, Optional,
|
||||
AsyncIterator, Awaitable,
|
||||
Union,
|
||||
)
|
||||
import asyncio
|
||||
from pprint import pformat
|
||||
|
@ -43,6 +44,7 @@ import time
|
|||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor import to_asyncio
|
||||
from async_generator import aclosing
|
||||
from ib_insync.wrapper import RequestError
|
||||
from ib_insync.contract import Contract, ContractDetails, Option
|
||||
|
@ -58,7 +60,7 @@ import numpy as np
|
|||
|
||||
from .. import config
|
||||
from ..log import get_logger, get_console_log
|
||||
from .._daemon import maybe_spawn_brokerd
|
||||
# from .._daemon import maybe_spawn_brokerd
|
||||
from ..data._source import from_df
|
||||
from ..data._sharedmem import ShmArray
|
||||
from ._util import SymbolNotFound, NoData
|
||||
|
@ -217,11 +219,12 @@ _enters = 0
|
|||
|
||||
|
||||
class Client:
|
||||
"""IB wrapped for our broker backend API.
|
||||
'''
|
||||
IB wrapped for our broker backend API.
|
||||
|
||||
Note: this client requires running inside an ``asyncio`` loop.
|
||||
|
||||
"""
|
||||
'''
|
||||
_contracts: dict[str, Contract] = {}
|
||||
|
||||
def __init__(
|
||||
|
@ -242,20 +245,22 @@ class Client:
|
|||
self,
|
||||
symbol: str,
|
||||
# EST in ISO 8601 format is required... below is EPOCH
|
||||
start_dt: str = "1970-01-01T00:00:00.000000-05:00",
|
||||
end_dt: str = "",
|
||||
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
|
||||
end_dt: Union[datetime, str ] = "",
|
||||
|
||||
sample_period_s: str = 1, # ohlc sample period
|
||||
period_count: int = int(2e3), # <- max per 1s sample query
|
||||
|
||||
is_paid_feed: bool = False, # placeholder
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Retreive OHLCV bars for a symbol over a range to the present.
|
||||
"""
|
||||
'''
|
||||
Retreive OHLCV bars for a symbol over a range to the present.
|
||||
|
||||
'''
|
||||
bars_kwargs = {'whatToShow': 'TRADES'}
|
||||
|
||||
global _enters
|
||||
print(f'ENTER BARS {_enters}')
|
||||
print(f'ENTER BARS {_enters} @ end={end_dt}')
|
||||
_enters += 1
|
||||
|
||||
contract = await self.find_contract(symbol)
|
||||
|
@ -984,7 +989,7 @@ async def _trio_run_client_method(
|
|||
# ):
|
||||
# kwargs['_treat_as_stream'] = True
|
||||
|
||||
return await tractor.to_asyncio.run_task(
|
||||
return await to_asyncio.run_task(
|
||||
_aio_run_client_method,
|
||||
meth=method,
|
||||
client=client,
|
||||
|
@ -992,60 +997,119 @@ async def _trio_run_client_method(
|
|||
)
|
||||
|
||||
|
||||
class _MethodProxy:
|
||||
class MethodProxy:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
portal: tractor.Portal
|
||||
chan: to_asyncio.LinkedTaskChannel,
|
||||
|
||||
) -> None:
|
||||
self._portal = portal
|
||||
self.chan = chan
|
||||
|
||||
async def _run_method(
|
||||
self,
|
||||
*,
|
||||
meth: str = None,
|
||||
**kwargs
|
||||
|
||||
) -> Any:
|
||||
return await self._portal.run(
|
||||
_trio_run_client_method,
|
||||
method=meth,
|
||||
**kwargs
|
||||
)
|
||||
'''
|
||||
Make a ``Client`` method call by requesting through the
|
||||
``tractor.to_asyncio`` layer.
|
||||
|
||||
'''
|
||||
chan = self.chan
|
||||
# send through method + ``kwargs: dict`` as pair
|
||||
await chan.send((meth, kwargs))
|
||||
return await chan.receive()
|
||||
|
||||
|
||||
def get_client_proxy(
|
||||
async def open_aio_client_method_relay(
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
|
||||
portal: tractor.Portal,
|
||||
target=Client,
|
||||
) -> None:
|
||||
|
||||
) -> _MethodProxy:
|
||||
|
||||
proxy = _MethodProxy(portal)
|
||||
|
||||
# mock all remote methods
|
||||
for name, method in inspect.getmembers(
|
||||
target, predicate=inspect.isfunction
|
||||
async with load_aio_clients() as (
|
||||
client,
|
||||
clients,
|
||||
accts2clients,
|
||||
):
|
||||
if '_' == name[0]:
|
||||
continue
|
||||
setattr(proxy, name, partial(proxy._run_method, meth=name))
|
||||
to_trio.send_nowait(client)
|
||||
|
||||
return proxy
|
||||
# relay all method requests to ``asyncio``-side client and
|
||||
# deliver back results
|
||||
while True:
|
||||
msg = await from_trio.get()
|
||||
meth_name, kwargs = msg
|
||||
|
||||
meth = getattr(client, meth_name)
|
||||
resp = await meth(**kwargs)
|
||||
|
||||
# echo the msg back
|
||||
to_trio.send_nowait(resp)
|
||||
|
||||
|
||||
@acm
|
||||
async def get_client(
|
||||
**kwargs,
|
||||
) -> Client:
|
||||
"""Init the ``ib_insync`` client in another actor and return
|
||||
a method proxy to it.
|
||||
"""
|
||||
async with maybe_spawn_brokerd(
|
||||
brokername='ib',
|
||||
infect_asyncio=True,
|
||||
**kwargs
|
||||
) as portal:
|
||||
proxy_client = get_client_proxy(portal)
|
||||
yield proxy_client
|
||||
async def open_client_proxy() -> MethodProxy:
|
||||
|
||||
try:
|
||||
async with to_asyncio.open_channel_from(
|
||||
open_aio_client_method_relay,
|
||||
) as (first, chan):
|
||||
|
||||
assert isinstance(first, Client)
|
||||
proxy = MethodProxy(chan)
|
||||
|
||||
# mock all remote methods on ib ``Client``.
|
||||
for name, method in inspect.getmembers(
|
||||
Client, predicate=inspect.isfunction
|
||||
):
|
||||
if '_' == name[0]:
|
||||
continue
|
||||
setattr(proxy, name, partial(proxy._run_method, meth=name))
|
||||
|
||||
yield proxy
|
||||
|
||||
except RequestError as err:
|
||||
code, msg = err.code, err.message
|
||||
|
||||
# TODO: retreive underlying ``ib_insync`` error?
|
||||
if (
|
||||
code == 162 and (
|
||||
'HMDS query returned no data' in msg
|
||||
or 'No market data permissions for' in msg
|
||||
)
|
||||
):
|
||||
# these cases should not cause a task crash
|
||||
log.warning(msg)
|
||||
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
# @acm
|
||||
# async def get_client(
|
||||
# **kwargs,
|
||||
|
||||
# ) -> Client:
|
||||
# '''
|
||||
# Init the ``ib_insync`` client in another actor and return
|
||||
# a method proxy to it.
|
||||
|
||||
# '''
|
||||
# async with (
|
||||
# maybe_spawn_brokerd(
|
||||
# brokername='ib',
|
||||
# infect_asyncio=True,
|
||||
# **kwargs
|
||||
# ) as portal,
|
||||
# ):
|
||||
# assert 0
|
||||
# TODO: the IPC via portal relay layer for when this current
|
||||
# actor isn't in aio mode.
|
||||
# open_client_proxy() as proxy,
|
||||
# yield proxy
|
||||
|
||||
|
||||
# https://interactivebrokers.github.io/tws-api/tick_types.html
|
||||
|
@ -1126,27 +1190,32 @@ def normalize(
|
|||
|
||||
async def get_bars(
|
||||
|
||||
proxy: MethodProxy,
|
||||
sym: str,
|
||||
end_dt: str = "",
|
||||
|
||||
) -> (dict, np.ndarray):
|
||||
'''
|
||||
Retrieve historical data from a ``trio``-side task using
|
||||
a ``MethoProxy``.
|
||||
|
||||
'''
|
||||
_err: Optional[Exception] = None
|
||||
|
||||
fails = 0
|
||||
bars: Optional[list] = None
|
||||
|
||||
for _ in range(2):
|
||||
try:
|
||||
|
||||
bars, bars_array = await _trio_run_client_method(
|
||||
method='bars',
|
||||
bars, bars_array = await proxy.bars(
|
||||
symbol=sym,
|
||||
end_dt=end_dt,
|
||||
)
|
||||
|
||||
if bars_array is None:
|
||||
raise SymbolNotFound(sym)
|
||||
|
||||
next_dt = bars[0].date
|
||||
print(f'ib datetime {next_dt}')
|
||||
|
||||
return (bars, bars_array, next_dt), fails
|
||||
|
||||
|
@ -1169,8 +1238,16 @@ async def get_bars(
|
|||
# error?
|
||||
# OLDER: seem to always cause throttling despite low rps
|
||||
|
||||
# raise err
|
||||
break
|
||||
# TODO: if there is not bars returned from the first
|
||||
# query we need to manually calculate the next step
|
||||
# back and convert to an expected datetime format.
|
||||
# if not bars:
|
||||
# raise
|
||||
|
||||
# try to decrement start point and look further back
|
||||
next_dt = bars[0].date
|
||||
print(f'ib datetime {next_dt}')
|
||||
continue
|
||||
|
||||
elif 'No market data permissions for' in err.message:
|
||||
|
||||
|
@ -1197,6 +1274,41 @@ async def get_bars(
|
|||
# raise _err
|
||||
|
||||
|
||||
@acm
|
||||
async def open_history_client(
|
||||
symbol: str,
|
||||
|
||||
) -> tuple[Callable, int]:
|
||||
|
||||
async with open_client_proxy() as proxy:
|
||||
|
||||
async def get_hist(
|
||||
end_dt: str,
|
||||
start_dt: str = '',
|
||||
|
||||
) -> tuple[np.ndarray, str]:
|
||||
|
||||
out, fails = await get_bars(proxy, symbol, end_dt=end_dt)
|
||||
|
||||
# TODO: add logic here to handle tradable hours and only grab
|
||||
# valid bars in the range
|
||||
if out == (None, None):
|
||||
# could be trying to retreive bars over weekend
|
||||
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
||||
raise NoData(f'{end_dt}')
|
||||
|
||||
bars, bars_array, next_dt = out
|
||||
|
||||
# volume cleaning since there's -ve entries,
|
||||
# wood luv to know what crookery that is..
|
||||
vlm = bars_array['volume']
|
||||
vlm[vlm < 0] = 0
|
||||
|
||||
return bars_array, next_dt
|
||||
|
||||
yield get_hist
|
||||
|
||||
|
||||
async def backfill_bars(
|
||||
|
||||
sym: str,
|
||||
|
@ -1219,56 +1331,60 @@ async def backfill_bars(
|
|||
https://github.com/pikers/piker/issues/128
|
||||
|
||||
'''
|
||||
if platform.system() == 'Windows':
|
||||
log.warning(
|
||||
'Decreasing history query count to 4 since, windows...')
|
||||
count = 4
|
||||
# async with open_history_client(sym) as proxy:
|
||||
async with open_client_proxy() as proxy:
|
||||
|
||||
out, fails = await get_bars(sym)
|
||||
if platform.system() == 'Windows':
|
||||
log.warning(
|
||||
'Decreasing history query count to 4 since, windows...')
|
||||
count = 4
|
||||
|
||||
if out is None:
|
||||
raise RuntimeError("Could not pull currrent history?!")
|
||||
out, fails = await get_bars(proxy, sym)
|
||||
|
||||
(first_bars, bars_array, next_dt) = out
|
||||
vlm = bars_array['volume']
|
||||
vlm[vlm < 0] = 0
|
||||
if out is None:
|
||||
raise RuntimeError("Could not pull currrent history?!")
|
||||
|
||||
# write historical data to buffer
|
||||
shm.push(bars_array)
|
||||
(first_bars, bars_array, next_dt) = out
|
||||
vlm = bars_array['volume']
|
||||
vlm[vlm < 0] = 0
|
||||
|
||||
with trio.CancelScope() as cs:
|
||||
# write historical data to buffer
|
||||
shm.push(bars_array)
|
||||
|
||||
task_status.started(cs)
|
||||
with trio.CancelScope() as cs:
|
||||
|
||||
i = 0
|
||||
while i < count:
|
||||
task_status.started(cs)
|
||||
|
||||
out, fails = await get_bars(sym, end_dt=next_dt)
|
||||
i = 0
|
||||
while i < count:
|
||||
|
||||
if fails is None or fails > 1:
|
||||
break
|
||||
out, fails = await get_bars(proxy, sym, end_dt=next_dt)
|
||||
|
||||
if out == (None, None):
|
||||
# could be trying to retreive bars over weekend
|
||||
# TODO: add logic here to handle tradable hours and only grab
|
||||
# valid bars in the range
|
||||
log.error(f"Can't grab bars starting at {next_dt}!?!?")
|
||||
continue
|
||||
if fails is None or fails > 1:
|
||||
break
|
||||
|
||||
bars, bars_array, next_dt = out
|
||||
if out == (None, None):
|
||||
# could be trying to retreive bars over weekend
|
||||
# TODO: add logic here to handle tradable hours and
|
||||
# only grab valid bars in the range
|
||||
log.error(f"Can't grab bars starting at {next_dt}!?!?")
|
||||
continue
|
||||
|
||||
# volume cleaning since there's -ve entries,
|
||||
# wood luv to know what crookery that is..
|
||||
vlm = bars_array['volume']
|
||||
vlm[vlm < 0] = 0
|
||||
# TODO we should probably dig into forums to see what peeps
|
||||
# think this data "means" and then use it as an indicator of
|
||||
# sorts? dinkus has mentioned that $vlms for the day dont'
|
||||
# match other platforms nor the summary stat tws shows in
|
||||
# the monitor - it's probably worth investigating.
|
||||
bars, bars_array, next_dt = out
|
||||
|
||||
shm.push(bars_array, prepend=True)
|
||||
i += 1
|
||||
# volume cleaning since there's -ve entries,
|
||||
# wood luv to know what crookery that is..
|
||||
vlm = bars_array['volume']
|
||||
vlm[vlm < 0] = 0
|
||||
|
||||
# TODO we should probably dig into forums to see what peeps
|
||||
# think this data "means" and then use it as an indicator of
|
||||
# sorts? dinkus has mentioned that $vlms for the day dont'
|
||||
# match other platforms nor the summary stat tws shows in
|
||||
# the monitor - it's probably worth investigating.
|
||||
|
||||
shm.push(bars_array, prepend=True)
|
||||
i += 1
|
||||
|
||||
|
||||
asset_type_map = {
|
||||
|
|
Loading…
Reference in New Issue