Add a mostly actor aware API to IB backend
Infected `asyncio` support is being added to `tractor` in goodboy/tractor#121 so delegate to all that new machinery. Start building out an "actor-aware" api which takes care of all the `trio`-`asyncio` interaction for data streaming and request handling. Add a little (shudder) method proxy system which can be used to invoke client methods from another actor. Start on a streaming api in preparation for real-time charting.ib_backend
parent
1abadeb506
commit
b8209cd506
|
@ -1,16 +1,29 @@
|
|||
"""
|
||||
Interactive Brokers API backend.
|
||||
|
||||
Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
|
||||
built on it) and thus actor aware apis must be spawned with
|
||||
``infected_aio==True``.
|
||||
"""
|
||||
import asyncio
|
||||
from dataclasses import asdict
|
||||
from typing import List, Dict, Any
|
||||
from functools import partial
|
||||
import inspect
|
||||
from typing import List, Dict, Any, Tuple
|
||||
from contextlib import asynccontextmanager
|
||||
import time
|
||||
|
||||
import trio
|
||||
import tractor
|
||||
from async_generator import aclosing
|
||||
import ib_insync as ibis
|
||||
from ib_insync.ticker import Ticker
|
||||
from ib_insync.contract import Contract, ContractDetails
|
||||
|
||||
from ..log import get_logger, get_console_log
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
_time_frames = {
|
||||
'1s': '1 Sec',
|
||||
|
@ -35,14 +48,14 @@ _time_frames = {
|
|||
|
||||
class Client:
|
||||
"""IB wrapped for our broker backend API.
|
||||
|
||||
Note: this client requires running inside an ``asyncio`` loop.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
ib: ibis.IB,
|
||||
) -> None:
|
||||
self.ib = ib
|
||||
# connect data feed callback...
|
||||
self.ib.pendingTickersEvent.connect(self.on_tickers)
|
||||
|
||||
async def bars(
|
||||
self,
|
||||
|
@ -57,7 +70,7 @@ class Client:
|
|||
"""
|
||||
contract = ibis.ContFuture('ES', exchange='GLOBEX')
|
||||
# contract = ibis.Stock('WEED', 'SMART', 'CAD')
|
||||
bars = self.ib.reqHistoricalData(
|
||||
bars = await self.ib.reqHistoricalDataAsync(
|
||||
contract,
|
||||
endDateTime='',
|
||||
# durationStr='60 S',
|
||||
|
@ -88,16 +101,25 @@ class Client:
|
|||
|
||||
Return a dictionary of ``upto`` entries worth of contract details.
|
||||
"""
|
||||
descriptions = self.ib.reqMatchingSymbols(pattern)
|
||||
descriptions = await self.ib.reqMatchingSymbolsAsync(pattern)
|
||||
|
||||
futs = []
|
||||
for d in descriptions:
|
||||
con = d.contract
|
||||
futs.append(self.ib.reqContractDetailsAsync(con))
|
||||
|
||||
# batch request all details
|
||||
results = await asyncio.gather(*futs)
|
||||
|
||||
# XXX: if there is more then one entry in the details list
|
||||
details = {}
|
||||
for description in descriptions:
|
||||
con = description.contract
|
||||
deats = self.ib.reqContractDetails(con)
|
||||
# XXX: if there is more then one entry in the details list
|
||||
for details_set in results:
|
||||
# then the contract is so called "ambiguous".
|
||||
for d in deats:
|
||||
for d in details_set:
|
||||
con = d.contract
|
||||
unique_sym = f'{con.symbol}.{con.primaryExchange}'
|
||||
details[unique_sym] = asdict(d) if asdicts else d
|
||||
|
||||
if len(details) == upto:
|
||||
return details
|
||||
|
||||
|
@ -118,6 +140,22 @@ class Client:
|
|||
) -> Contract:
|
||||
raise NotImplementedError
|
||||
|
||||
async def stream_ticker(
|
||||
self,
|
||||
symbol: str,
|
||||
to_trio,
|
||||
opts: Tuple[int] = ('233', '375'),
|
||||
) -> None:
|
||||
"""Stream a ticker using the std L1 api.
|
||||
"""
|
||||
sym, exch = symbol.split('.')
|
||||
contract = ibis.Stock(sym.upper(), exchange=exch.upper())
|
||||
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
|
||||
ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))
|
||||
|
||||
# let the engine run and stream
|
||||
await self.ib.disconnectedEvent
|
||||
|
||||
|
||||
# default config ports
|
||||
_tws_port: int = 7497
|
||||
|
@ -125,7 +163,7 @@ _gw_port: int = 4002
|
|||
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_client(
|
||||
async def _aio_get_client(
|
||||
host: str = '127.0.0.1',
|
||||
port: int = None,
|
||||
client_id: int = 1,
|
||||
|
@ -133,8 +171,7 @@ async def get_client(
|
|||
"""Return an ``ib_insync.IB`` instance wrapped in our client API.
|
||||
"""
|
||||
ib = ibis.IB()
|
||||
# TODO: some detection magic to figure out if tws vs. the
|
||||
# gateway is up ad choose the appropriate port
|
||||
|
||||
if port is None:
|
||||
ports = [_tws_port, _gw_port]
|
||||
else:
|
||||
|
@ -152,91 +189,170 @@ async def get_client(
|
|||
else:
|
||||
raise ConnectionRefusedError(_err)
|
||||
|
||||
yield Client(ib)
|
||||
ib.disconnect()
|
||||
try:
|
||||
yield Client(ib)
|
||||
except BaseException:
|
||||
ib.disconnect()
|
||||
raise
|
||||
|
||||
|
||||
async def _aio_run_client_method(
|
||||
meth: str,
|
||||
to_trio,
|
||||
from_trio,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
|
||||
async with _aio_get_client() as client:
|
||||
|
||||
async_meth = getattr(client, meth)
|
||||
|
||||
# handle streaming methods
|
||||
args = tuple(inspect.getfullargspec(async_meth).args)
|
||||
if 'to_trio' in args:
|
||||
kwargs['to_trio'] = to_trio
|
||||
|
||||
return await async_meth(**kwargs)
|
||||
|
||||
|
||||
async def _trio_run_client_method(
|
||||
method: str,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
ca = tractor.current_actor()
|
||||
assert ca.is_infected_aio()
|
||||
|
||||
# if the method is an async gen stream for it
|
||||
meth = getattr(Client, method)
|
||||
if inspect.isasyncgenfunction(meth):
|
||||
kwargs['_treat_as_stream'] = True
|
||||
|
||||
# if the method is an async func but streams back results
|
||||
# make sure to also stream from it
|
||||
args = tuple(inspect.getfullargspec(meth).args)
|
||||
if 'to_trio' in args:
|
||||
kwargs['_treat_as_stream'] = True
|
||||
|
||||
result = await tractor.to_asyncio.run_task(
|
||||
_aio_run_client_method,
|
||||
meth=method,
|
||||
**kwargs
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def get_method_proxy(portal):
|
||||
|
||||
class MethodProxy:
|
||||
def __init__(self, portal: tractor._portal.Portal):
|
||||
self._portal = portal
|
||||
|
||||
async def _run_method(
|
||||
self,
|
||||
*,
|
||||
meth: str = None,
|
||||
**kwargs
|
||||
) -> Any:
|
||||
return await self._portal.run(
|
||||
__name__,
|
||||
'_trio_run_client_method',
|
||||
method=meth,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
proxy = MethodProxy(portal)
|
||||
|
||||
# mock all remote methods
|
||||
for name, method in inspect.getmembers(
|
||||
Client, predicate=inspect.isfunction
|
||||
):
|
||||
if '_' == name[0]:
|
||||
continue
|
||||
setattr(proxy, name, partial(proxy._run_method, meth=name))
|
||||
|
||||
return proxy
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def maybe_spawn_brokerd(
|
||||
**kwargs,
|
||||
) -> tractor._portal.Portal:
|
||||
async with tractor.find_actor('brokerd_ib') as portal:
|
||||
if portal is None: # no broker daemon created yet
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
# XXX: this needs to somehow be hidden
|
||||
portal = await n.start_actor(
|
||||
'brokerd_ib',
|
||||
rpc_module_paths=[__name__],
|
||||
infect_asyncio=True,
|
||||
)
|
||||
async with tractor.wait_for_actor(
|
||||
'brokerd_ib'
|
||||
) as portal:
|
||||
yield portal
|
||||
|
||||
# client code may block indefinitely so cancel when
|
||||
# teardown is invoked
|
||||
await n.cancel()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_client(
|
||||
**kwargs,
|
||||
) -> Client:
|
||||
"""Init the ``ib_insync`` client in another actor and return
|
||||
a method proxy to it.
|
||||
"""
|
||||
async with maybe_spawn_brokerd(**kwargs) as portal:
|
||||
yield get_method_proxy(portal)
|
||||
|
||||
|
||||
async def trio_stream_ticker(sym):
|
||||
get_console_log('info')
|
||||
|
||||
# con_es = ibis.ContFuture('ES', exchange='GLOBEX')
|
||||
# es = ibis.Future('ES', '20200918', exchange='GLOBEX')
|
||||
|
||||
stream = await tractor.to_asyncio.run_task(
|
||||
_trio_run_client_method,
|
||||
method='stream_ticker',
|
||||
symbol=sym,
|
||||
)
|
||||
async with aclosing(stream):
|
||||
async for ticker in stream:
|
||||
lft = ticker.lastFillTime
|
||||
for tick_data in ticker.ticks:
|
||||
value = tick_data._asdict()
|
||||
now = time.time()
|
||||
value['time'] = now
|
||||
value['last_fill_time'] = lft
|
||||
if lft:
|
||||
value['latency'] = now - lft
|
||||
yield value
|
||||
|
||||
|
||||
async def stream_from_brokerd(sym):
|
||||
|
||||
async with maybe_spawn_brokerd() as portal:
|
||||
stream = await portal.run(
|
||||
__name__,
|
||||
'trio_stream_ticker',
|
||||
sym=sym,
|
||||
)
|
||||
async for tick in stream:
|
||||
print(f"trio got: {tick}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
|
||||
con_es = ibis.ContFuture('ES', exchange='GLOBEX')
|
||||
es = ibis.Future('ES', '20200918', exchange='GLOBEX')
|
||||
spy = ibis.Stock('SPY', exchange='ARCA')
|
||||
sym = sys.argv[1]
|
||||
|
||||
# ticker = client.ib.reqTickByTickData(
|
||||
# contract,
|
||||
# tickType='Last',
|
||||
# numberOfTicks=1,
|
||||
# )
|
||||
# client.ib.reqTickByTickData(
|
||||
# contract,
|
||||
# tickType='AllLast',
|
||||
# numberOfTicks=1,
|
||||
# )
|
||||
# client.ib.reqTickByTickData(
|
||||
# contract,
|
||||
# tickType='BidAsk',
|
||||
# numberOfTicks=1,
|
||||
# )
|
||||
|
||||
# ITC (inter task comms)
|
||||
from_trio = asyncio.Queue()
|
||||
to_trio, from_aio = trio.open_memory_channel(float("inf"))
|
||||
|
||||
async def start_ib(from_trio, to_trio):
|
||||
print("starting the EYEEEEBEEEEE GATEWAYYYYYYY!")
|
||||
async with get_client() as client:
|
||||
|
||||
# stream ticks to trio task
|
||||
def ontick(ticker: Ticker):
|
||||
for t in ticker.ticks:
|
||||
# send tick data to trio
|
||||
to_trio.send_nowait(t)
|
||||
|
||||
ticker = client.ib.reqMktData(spy, '588', False, False, None)
|
||||
ticker.updateEvent += ontick
|
||||
|
||||
n = await from_trio.get()
|
||||
assert n == 0
|
||||
|
||||
# sleep and let the engine run
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
# TODO: cmd processing from trio
|
||||
# while True:
|
||||
# n = await from_trio.get()
|
||||
# print(f"aio got: {n}")
|
||||
# to_trio.send_nowait(n + 1)
|
||||
|
||||
async def trio_main():
|
||||
print("trio_main!")
|
||||
|
||||
asyncio.create_task(
|
||||
start_ib(from_trio, to_trio)
|
||||
)
|
||||
|
||||
from_trio.put_nowait(0)
|
||||
|
||||
async for tick in from_aio:
|
||||
print(f"trio got: {tick}")
|
||||
|
||||
# TODO: send cmds to asyncio
|
||||
# from_trio.put_nowait(n + 1)
|
||||
|
||||
async def aio_main():
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
trio_done_fut = asyncio.Future()
|
||||
|
||||
def trio_done_callback(main_outcome):
|
||||
print(f"trio_main finished: {main_outcome!r}")
|
||||
trio_done_fut.set_result(main_outcome)
|
||||
|
||||
trio.lowlevel.start_guest_run(
|
||||
trio_main,
|
||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||
done_callback=trio_done_callback,
|
||||
)
|
||||
|
||||
(await trio_done_fut).unwrap()
|
||||
|
||||
asyncio.run(aio_main())
|
||||
tractor.run(
|
||||
stream_from_brokerd,
|
||||
sym,
|
||||
# XXX: must be multiprocessing
|
||||
start_method='forkserver',
|
||||
loglevel='info'
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue