Port to new streaming api, yield whole tickers

its_happening
Tyler Goodlet 2020-07-15 08:40:20 -04:00
parent 50f903d7c5
commit 8fa569787d
1 changed files with 71 additions and 81 deletions

View File

@ -2,28 +2,35 @@
Interactive Brokers API backend.
Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware apis must be spawned with
built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``.
"""
import asyncio
from contextlib import asynccontextmanager
from dataclasses import asdict
from functools import partial
from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator
import asyncio
import inspect
from typing import List, Dict, Any, Tuple
from contextlib import asynccontextmanager
import itertools
import time
import tractor
from async_generator import aclosing
import ib_insync as ibis
from ib_insync.ticker import Ticker
from ib_insync.contract import Contract, ContractDetails
from ib_insync.ticker import Ticker
import ib_insync as ibis
import tractor
from ..log import get_logger, get_console_log
from ..data import maybe_spawn_brokerd
from ..ui._source import from_df
log = get_logger(__name__)
# passed to ``tractor.ActorNursery.start_actor()``
_spawn_kwargs = {
'infect_asyncio': True,
}
_time_units = {
's': ' sec',
'm': ' mins',
@ -87,12 +94,10 @@ class Client:
whatToShow='TRADES',
useRTH=False
)
# TODO: raise underlying error here
assert bars
# barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True)
# convert to pandas dataframe:
df = ibis.util.df(bars)
# print(df[['date', 'open', 'high', 'low', 'close', 'volume']])
from piker.ui._source import from_df
return from_df(df)
async def search_stocks(
@ -194,7 +199,6 @@ class Client:
# default config ports
_tws_port: int = 7497
_gw_port: int = 4002
# list of ports to try in order
_try_ports = [_tws_port, _gw_port]
@ -202,10 +206,20 @@ _try_ports = [_tws_port, _gw_port]
async def _aio_get_client(
host: str = '127.0.0.1',
port: int = None,
client_id: int = 1,
client_id: Optional[int] = None,
) -> Client:
"""Return an ``ib_insync.IB`` instance wrapped in our client API.
"""
if client_id is None:
# if this is a persistent brokerd, try to allocate a new id for
# each client
try:
ss = tractor.current_actor().statespace
client_id = next(ss.setdefault('client_ids', itertools.count()))
except RuntimeError:
# tractor likely isn't running
client_id = 1
ib = ibis.IB()
ports = _try_ports if port is None else [port]
_err = None
@ -271,9 +285,7 @@ async def _trio_run_client_method(
return result
def get_method_proxy(portal, target):
class MethodProxy:
class _MethodProxy:
def __init__(self, portal: tractor._portal.Portal):
self._portal = portal
@ -290,7 +302,10 @@ def get_method_proxy(portal, target):
**kwargs
)
proxy = MethodProxy(portal)
def get_method_proxy(portal, target) -> _MethodProxy:
proxy = _MethodProxy(portal)
# mock all remote methods
for name, method in inspect.getmembers(
@ -303,33 +318,6 @@ def get_method_proxy(portal, target):
return proxy
@asynccontextmanager
async def maybe_spawn_brokerd(
**kwargs,
) -> tractor._portal.Portal:
async with tractor.find_actor('brokerd_ib') as portal:
# WTF: why doesn't this work?
print(__name__)
if portal is not None:
yield portal
else: # no broker daemon created yet
async with tractor.open_nursery() as n:
# XXX: this needs to somehow be hidden
portal = await n.start_actor(
'brokerd_ib',
rpc_module_paths=[__name__],
infect_asyncio=True,
)
async with tractor.wait_for_actor(
'brokerd_ib'
) as portal:
yield portal
# client code may block indefinitely so cancel when
# teardown is invoked
await n.cancel()
@asynccontextmanager
async def get_client(
**kwargs,
@ -337,45 +325,47 @@ async def get_client(
"""Init the ``ib_insync`` client in another actor and return
a method proxy to it.
"""
async with maybe_spawn_brokerd(**kwargs) as portal:
async with maybe_spawn_brokerd(
brokername='ib',
expose_mods=[__name__],
infect_asyncio=True,
**kwargs
) as portal:
yield get_method_proxy(portal, Client)
async def trio_stream_ticker(sym):
async def stream_quotes(
symbols: List[str],
) -> AsyncGenerator[str, Dict[str, Any]]:
"""Stream symbol quotes.
This is a ``trio`` callable routine meant to be invoked
once the brokerd is up.
"""
get_console_log('info')
stream = await tractor.to_asyncio.run_task(
_trio_run_client_method,
method='stream_ticker',
symbol=sym,
symbol=symbols[0],
)
async with aclosing(stream):
async for ticker in stream:
# TODO: validate this value
lft = ticker.rtTime
for tick_data in ticker.ticks:
value = tick_data._asdict()
now = time.time()
value['time'] = now
value['last_fill_time'] = lft
if lft:
# convert from milliseconds
lft = float(lft) / 1000.
value['latency'] = now - lft
# convert named tuples to dicts so we send usable keys
# for tick_data in ticker.ticks:
ticker.ticks = [td._asdict() for td in ticker.ticks]
yield value
data = asdict(ticker)
# add time stamps for downstream latency measurements
data['brokerd_ts'] = time.time()
if ticker.rtTime:
data['rtTime_s'] = float(ticker.rtTime) / 1000.
async def stream_from_brokerd(sym):
yield data
async with maybe_spawn_brokerd() as portal:
stream = await portal.run(
__name__,
'trio_stream_ticker',
sym=sym,
)
async for tick in stream:
print(f"trio got: {tick}")
# ugh, clear ticks since we've consumed them
ticker.ticks = []
if __name__ == '__main__':