Add a mostly actor aware API to IB backend

Infected `asyncio` support is being added to `tractor` in
goodboy/tractor#121 so delegate to all that new machinery.

Start building out an "actor-aware" api which takes care of all the
`trio`-`asyncio` interaction for data streaming and request handling.
Add a little (shudder) method proxy system which can be used to invoke
client methods from another actor. Start on a streaming api in
preparation for real-time charting.
its_happening
Tyler Goodlet 2020-07-02 12:54:34 -04:00
parent 72a3149dc7
commit f216d1f922
1 changed files with 213 additions and 97 deletions

View File

@ -1,16 +1,29 @@
"""
Interactive Brokers API backend.
Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware apis must be spawned with
``infected_aio==True``.
"""
import asyncio
from dataclasses import asdict
from typing import List, Dict, Any
from functools import partial
import inspect
from typing import List, Dict, Any, Tuple
from contextlib import asynccontextmanager
import time
import trio
import tractor
from async_generator import aclosing
import ib_insync as ibis
from ib_insync.ticker import Ticker
from ib_insync.contract import Contract, ContractDetails
from ..log import get_logger, get_console_log
log = get_logger(__name__)
_time_frames = {
'1s': '1 Sec',
@ -35,14 +48,14 @@ _time_frames = {
class Client:
"""IB wrapped for our broker backend API.
Note: this client requires running inside an ``asyncio`` loop.
"""
def __init__(
self,
ib: ibis.IB,
) -> None:
self.ib = ib
# connect data feed callback...
self.ib.pendingTickersEvent.connect(self.on_tickers)
async def bars(
self,
@ -57,7 +70,7 @@ class Client:
"""
contract = ibis.ContFuture('ES', exchange='GLOBEX')
# contract = ibis.Stock('WEED', 'SMART', 'CAD')
bars = self.ib.reqHistoricalData(
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime='',
# durationStr='60 S',
@ -88,16 +101,25 @@ class Client:
Return a dictionary of ``upto`` entries worth of contract details.
"""
descriptions = self.ib.reqMatchingSymbols(pattern)
details = {}
for description in descriptions:
con = description.contract
deats = self.ib.reqContractDetails(con)
descriptions = await self.ib.reqMatchingSymbolsAsync(pattern)
futs = []
for d in descriptions:
con = d.contract
futs.append(self.ib.reqContractDetailsAsync(con))
# batch request all details
results = await asyncio.gather(*futs)
# XXX: if there is more then one entry in the details list
details = {}
for details_set in results:
# then the contract is so called "ambiguous".
for d in deats:
for d in details_set:
con = d.contract
unique_sym = f'{con.symbol}.{con.primaryExchange}'
details[unique_sym] = asdict(d) if asdicts else d
if len(details) == upto:
return details
@ -118,6 +140,22 @@ class Client:
) -> Contract:
raise NotImplementedError
async def stream_ticker(
self,
symbol: str,
to_trio,
opts: Tuple[int] = ('233', '375'),
) -> None:
"""Stream a ticker using the std L1 api.
"""
sym, exch = symbol.split('.')
contract = ibis.Stock(sym.upper(), exchange=exch.upper())
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))
# let the engine run and stream
await self.ib.disconnectedEvent
# default config ports
_tws_port: int = 7497
@ -125,7 +163,7 @@ _gw_port: int = 4002
@asynccontextmanager
async def get_client(
async def _aio_get_client(
host: str = '127.0.0.1',
port: int = None,
client_id: int = 1,
@ -133,8 +171,7 @@ async def get_client(
"""Return an ``ib_insync.IB`` instance wrapped in our client API.
"""
ib = ibis.IB()
# TODO: some detection magic to figure out if tws vs. the
# gateway is up ad choose the appropriate port
if port is None:
ports = [_tws_port, _gw_port]
else:
@ -152,91 +189,170 @@ async def get_client(
else:
raise ConnectionRefusedError(_err)
try:
yield Client(ib)
except BaseException:
ib.disconnect()
raise
async def _aio_run_client_method(
meth: str,
to_trio,
from_trio,
**kwargs,
) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client:
async_meth = getattr(client, meth)
# handle streaming methods
args = tuple(inspect.getfullargspec(async_meth).args)
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
return await async_meth(**kwargs)
async def _trio_run_client_method(
method: str,
**kwargs,
) -> None:
ca = tractor.current_actor()
assert ca.is_infected_aio()
# if the method is an async gen stream for it
meth = getattr(Client, method)
if inspect.isasyncgenfunction(meth):
kwargs['_treat_as_stream'] = True
# if the method is an async func but streams back results
# make sure to also stream from it
args = tuple(inspect.getfullargspec(meth).args)
if 'to_trio' in args:
kwargs['_treat_as_stream'] = True
result = await tractor.to_asyncio.run_task(
_aio_run_client_method,
meth=method,
**kwargs
)
return result
def get_method_proxy(portal):
class MethodProxy:
def __init__(self, portal: tractor._portal.Portal):
self._portal = portal
async def _run_method(
self,
*,
meth: str = None,
**kwargs
) -> Any:
return await self._portal.run(
__name__,
'_trio_run_client_method',
method=meth,
**kwargs
)
proxy = MethodProxy(portal)
# mock all remote methods
for name, method in inspect.getmembers(
Client, predicate=inspect.isfunction
):
if '_' == name[0]:
continue
setattr(proxy, name, partial(proxy._run_method, meth=name))
return proxy
@asynccontextmanager
async def maybe_spawn_brokerd(
**kwargs,
) -> tractor._portal.Portal:
async with tractor.find_actor('brokerd_ib') as portal:
if portal is None: # no broker daemon created yet
async with tractor.open_nursery() as n:
# XXX: this needs to somehow be hidden
portal = await n.start_actor(
'brokerd_ib',
rpc_module_paths=[__name__],
infect_asyncio=True,
)
async with tractor.wait_for_actor(
'brokerd_ib'
) as portal:
yield portal
# client code may block indefinitely so cancel when
# teardown is invoked
await n.cancel()
@asynccontextmanager
async def get_client(
**kwargs,
) -> Client:
"""Init the ``ib_insync`` client in another actor and return
a method proxy to it.
"""
async with maybe_spawn_brokerd(**kwargs) as portal:
yield get_method_proxy(portal)
async def trio_stream_ticker(sym):
get_console_log('info')
# con_es = ibis.ContFuture('ES', exchange='GLOBEX')
# es = ibis.Future('ES', '20200918', exchange='GLOBEX')
stream = await tractor.to_asyncio.run_task(
_trio_run_client_method,
method='stream_ticker',
symbol=sym,
)
async with aclosing(stream):
async for ticker in stream:
lft = ticker.lastFillTime
for tick_data in ticker.ticks:
value = tick_data._asdict()
now = time.time()
value['time'] = now
value['last_fill_time'] = lft
if lft:
value['latency'] = now - lft
yield value
async def stream_from_brokerd(sym):
async with maybe_spawn_brokerd() as portal:
stream = await portal.run(
__name__,
'trio_stream_ticker',
sym=sym,
)
async for tick in stream:
print(f"trio got: {tick}")
if __name__ == '__main__':
import sys
con_es = ibis.ContFuture('ES', exchange='GLOBEX')
es = ibis.Future('ES', '20200918', exchange='GLOBEX')
spy = ibis.Stock('SPY', exchange='ARCA')
sym = sys.argv[1]
# ticker = client.ib.reqTickByTickData(
# contract,
# tickType='Last',
# numberOfTicks=1,
# )
# client.ib.reqTickByTickData(
# contract,
# tickType='AllLast',
# numberOfTicks=1,
# )
# client.ib.reqTickByTickData(
# contract,
# tickType='BidAsk',
# numberOfTicks=1,
# )
# ITC (inter task comms)
from_trio = asyncio.Queue()
to_trio, from_aio = trio.open_memory_channel(float("inf"))
async def start_ib(from_trio, to_trio):
print("starting the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with get_client() as client:
# stream ticks to trio task
def ontick(ticker: Ticker):
for t in ticker.ticks:
# send tick data to trio
to_trio.send_nowait(t)
ticker = client.ib.reqMktData(spy, '588', False, False, None)
ticker.updateEvent += ontick
n = await from_trio.get()
assert n == 0
# sleep and let the engine run
await asyncio.sleep(float('inf'))
# TODO: cmd processing from trio
# while True:
# n = await from_trio.get()
# print(f"aio got: {n}")
# to_trio.send_nowait(n + 1)
async def trio_main():
print("trio_main!")
asyncio.create_task(
start_ib(from_trio, to_trio)
tractor.run(
stream_from_brokerd,
sym,
# XXX: must be multiprocessing
start_method='forkserver',
loglevel='info'
)
from_trio.put_nowait(0)
async for tick in from_aio:
print(f"trio got: {tick}")
# TODO: send cmds to asyncio
# from_trio.put_nowait(n + 1)
async def aio_main():
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
print(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
asyncio.run(aio_main())