Merge pull request #235 from pikers/ib_client_scan

Ib client scanning
teardown_guesmost_via_cs
goodboy 2021-11-01 13:28:05 -04:00 committed by GitHub
commit 0a54ed7dad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 502 additions and 287 deletions

View File

@ -47,7 +47,7 @@ _root_modules = [
class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}

View File

@ -21,10 +21,12 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``.
"""
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager as acm
from dataclasses import asdict
from datetime import datetime
from functools import partial
import itertools
from math import isnan
from typing import (
Any, Optional,
AsyncIterator, Awaitable,
@ -32,8 +34,8 @@ from typing import (
import asyncio
from pprint import pformat
import inspect
import itertools
import logging
import platform
from random import randint
import time
@ -501,7 +503,11 @@ class Client:
contract,
snapshot=True,
)
# ensure a last price gets filled in before we deliver quote
while isnan(ticker.last):
ticker = await ticker.updateEvent
details = (await details_fute)[0]
return contract, ticker, details
@ -569,66 +575,6 @@ class Client:
)
)
async def recv_trade_updates(
self,
to_trio: trio.abc.SendChannel,
) -> None:
"""Stream a ticker using the std L1 api.
"""
self.inline_errors(to_trio)
def push_tradesies(eventkit_obj, obj, fill=None):
"""Push events to trio task.
"""
if fill is not None:
# execution details event
item = ('fill', (obj, fill))
elif eventkit_obj.name() == 'positionEvent':
item = ('position', obj)
else:
item = ('status', obj)
log.info(f'eventkit event ->\n{pformat(item)}')
try:
to_trio.send_nowait(item)
except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies)
# hook up to the weird eventkit object - event stream api
for ev_name in [
'orderStatusEvent', # all order updates
'execDetailsEvent', # all "fill" updates
'positionEvent', # avg price updates per symbol per account
# 'commissionReportEvent',
# XXX: ugh, it is a separate event from IB and it's
# emitted as follows:
# self.ib.commissionReportEvent.emit(trade, fill, report)
# XXX: not sure yet if we need these
# 'updatePortfolioEvent',
# XXX: these all seem to be weird ib_insync intrernal
# events that we probably don't care that much about
# given the internal design is wonky af..
# 'newOrderEvent',
# 'orderModifyEvent',
# 'cancelOrderEvent',
# 'openOrderEvent',
]:
eventkit_obj = getattr(self.ib, ev_name)
handler = partial(push_tradesies, eventkit_obj)
eventkit_obj.connect(handler)
# let the engine run and stream
await self.ib.disconnectedEvent
def inline_errors(
self,
to_trio: trio.abc.SendChannel,
@ -674,6 +620,71 @@ class Client:
return self.ib.positions(account=account)
async def recv_trade_updates(
client: Client,
to_trio: trio.abc.SendChannel,
) -> None:
"""Stream a ticker using the std L1 api.
"""
client.inline_errors(to_trio)
# sync with trio task
to_trio.send_nowait(None)
def push_tradesies(eventkit_obj, obj, fill=None):
"""Push events to trio task.
"""
if fill is not None:
# execution details event
item = ('fill', (obj, fill))
elif eventkit_obj.name() == 'positionEvent':
item = ('position', obj)
else:
item = ('status', obj)
log.info(f'eventkit event ->\n{pformat(item)}')
try:
to_trio.send_nowait(item)
except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies)
# hook up to the weird eventkit object - event stream api
for ev_name in [
'orderStatusEvent', # all order updates
'execDetailsEvent', # all "fill" updates
'positionEvent', # avg price updates per symbol per account
# 'commissionReportEvent',
# XXX: ugh, it is a separate event from IB and it's
# emitted as follows:
# self.ib.commissionReportEvent.emit(trade, fill, report)
# XXX: not sure yet if we need these
# 'updatePortfolioEvent',
# XXX: these all seem to be weird ib_insync intrernal
# events that we probably don't care that much about
# given the internal design is wonky af..
# 'newOrderEvent',
# 'orderModifyEvent',
# 'cancelOrderEvent',
# 'openOrderEvent',
]:
eventkit_obj = getattr(client.ib, ev_name)
handler = partial(push_tradesies, eventkit_obj)
eventkit_obj.connect(handler)
# let the engine run and stream
await client.ib.disconnectedEvent
# default config ports
_tws_port: int = 7497
_gw_port: int = 4002
@ -684,7 +695,8 @@ _try_ports = [
# TODO: remove the randint stuff and use proper error checking in client
# factor below..
_client_ids = itertools.count(randint(1, 100))
_client_cache = {}
_client_cache: dict[tuple[str, int], Client] = {}
_scan_ignore: set[tuple[str, int]] = set()
def get_config() -> dict[str, Any]:
@ -703,7 +715,7 @@ def get_config() -> dict[str, Any]:
_accounts2clients: dict[str, Client] = {}
@asynccontextmanager
@acm
async def load_aio_clients(
host: str = '127.0.0.1',
@ -718,8 +730,7 @@ async def load_aio_clients(
TODO: consider doing this with a ctx mngr eventually?
'''
global _accounts2clients
global _client_cache
global _accounts2clients, _client_cache, _scan_ignore
conf = get_config()
ib = None
@ -727,7 +738,17 @@ async def load_aio_clients(
# attempt to get connection info from config; if no .toml entry
# exists, we try to load from a default localhost connection.
host = conf.get('host', '127.0.0.1')
localhost = '127.0.0.1'
host, hosts = conf.get('host'), conf.get('hosts')
if not (hosts or host):
host = localhost
if not hosts:
hosts = [host]
elif host and hosts:
raise ValueError(
'Specify only one of `host` or `hosts` in `brokers.toml` config')
ports = conf.get(
'ports',
@ -735,19 +756,22 @@ async def load_aio_clients(
{
'gw': 4002,
'tws': 7497,
'order': ['gw', 'tws']
# 'order': ['gw', 'tws']
}
)
order = ports['order']
order = ports.pop('order', None)
if order:
log.warning('`ports.order` section in `brokers.toml` is deprecated')
accounts_def = config.load_accounts(['ib'])
try_ports = [ports[key] for key in order]
try_ports = list(ports.values())
ports = try_ports if port is None else [port]
# we_connected = []
connect_timeout = 0.5 if platform.system() != 'Windows' else 1
combos = list(itertools.product(hosts, ports))
we_connected = []
# allocate new and/or reload disconnected but cached clients
try:
# try:
# TODO: support multiple clients allowing for execution on
# multiple accounts (including a paper instance running on the
# same machine) and switching between accounts in the EMs
@ -756,19 +780,35 @@ async def load_aio_clients(
# (re)load any and all clients that can be found
# from connection details in ``brokers.toml``.
for port in ports:
client = _client_cache.get((host, port))
for host, port in combos:
sockaddr = (host, port)
client = _client_cache.get(sockaddr)
accounts_found: dict[str, Client] = {}
if not client or not client.ib.isConnected():
if (
client and client.ib.isConnected() or
sockaddr in _scan_ignore
):
continue
try:
ib = NonShittyIB()
# if this is a persistent brokerd, try to allocate
# a new id for each client
client_id = next(_client_ids)
# XXX: not sure if we ever really need to increment the
# client id if teardown is sucessful.
client_id = 616
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id)
await ib.connectAsync(
host,
port,
clientId=client_id,
# this timeout is sensative on windows and will
# fail without a good "timeout error" so be
# careful.
timeout=connect_timeout,
)
# create and cache client
client = Client(ib)
@ -801,15 +841,45 @@ async def load_aio_clients(
# update all actor-global caches
log.info(f"Caching client for {(host, port)}")
_client_cache[(host, port)] = client
we_connected.append(client)
# we_connected.append((host, port, client))
# TODO: don't do it this way, get a gud to_asyncio
# context / .start() system goin..
def pop_and_discon():
log.info(f'Disconnecting client {client}')
client.ib.disconnect()
_client_cache.pop((host, port), None)
# NOTE: the above callback **CAN'T FAIL** or shm won't get
# torn down correctly ...
tractor._actor._lifetime_stack.callback(pop_and_discon)
_accounts2clients.update(accounts_found)
except ConnectionRefusedError as ce:
except (
ConnectionRefusedError,
# TODO: if trying to scan for remote api clients
# pretty sure we need to catch this, though it
# definitely needs a shorter timeout since it hangs
# for like 5s..
asyncio.exceptions.TimeoutError,
OSError,
) as ce:
_err = ce
log.warning(f'Failed to connect on {port}')
# cache logic to avoid rescanning if we already have all
# clients loaded.
_scan_ignore.add(sockaddr)
else:
if not _client_cache:
raise ConnectionRefusedError(_err)
raise ConnectionError(
'No ib APIs could be found scanning @:\n'
f'{pformat(combos)}\n'
'Check your `brokers.toml` and/or network'
) from _err
# retreive first loaded client
clients = list(_client_cache.values())
@ -818,10 +888,14 @@ async def load_aio_clients(
yield client, _client_cache, _accounts2clients
except BaseException:
for client in we_connected:
client.ib.disconnect()
raise
# TODO: this in a way that works xD
# finally:
# pass
# # async with trio.CancelScope(shield=True):
# for host, port, client in we_connected:
# client.ib.disconnect()
# _client_cache.pop((host, port))
# raise
async def _aio_run_client_method(
@ -862,16 +936,16 @@ async def _trio_run_client_method(
assert ca.is_infected_aio()
# if the method is an *async gen* stream for it
meth = getattr(Client, method)
# meth = getattr(Client, method)
args = tuple(inspect.getfullargspec(meth).args)
# args = tuple(inspect.getfullargspec(meth).args)
if inspect.isasyncgenfunction(meth) or (
# if the method is an *async func* but manually
# streams back results, make sure to also stream it
'to_trio' in args
):
kwargs['_treat_as_stream'] = True
# if inspect.isasyncgenfunction(meth) or (
# # if the method is an *async func* but manually
# # streams back results, make sure to also stream it
# 'to_trio' in args
# ):
# kwargs['_treat_as_stream'] = True
return await tractor.to_asyncio.run_task(
_aio_run_client_method,
@ -921,7 +995,7 @@ def get_client_proxy(
return proxy
@asynccontextmanager
@acm
async def get_client(
**kwargs,
) -> Client:
@ -990,8 +1064,10 @@ def normalize(
async def get_bars(
sym: str,
end_dt: str = "",
) -> (dict, np.ndarray):
_err: Optional[Exception] = None
@ -1019,10 +1095,20 @@ async def get_bars(
# TODO: retreive underlying ``ib_insync`` error?
if err.code == 162:
# TODO: so this error is normally raised (it seems) if
# we try to retrieve history for a time range for which
# there is none. in that case we should not only report
# the "empty range" but also do a iteration on the time
# step for ``next_dt`` to see if we can pull older
# history.
if 'HMDS query returned no data' in err.message:
# means we hit some kind of historical "dead zone"
# and further requests seem to always cause
# throttling despite the rps being low
# means we hit some kind of historical "empty space"
# and further requests will need to decrement the
# start time dt in order to not receive a further
# error?
# OLDER: seem to always cause throttling despite low rps
# raise err
break
elif 'No market data permissions for' in err.message:
@ -1045,17 +1131,22 @@ async def get_bars(
fails += 1
continue
return (None, None)
return None, None
# else: # throttle wasn't fixed so error out immediately
# raise _err
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
# count: int = 20, # NOTE: any more and we'll overrun underlying buffer
count: int = 6, # NOTE: any more and we'll overrun the underlying buffer
# TODO: we want to avoid overrunning the underlying shm array buffer
# and we should probably calc the number of calls to make depending
# on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys
# memory.
count: int = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -1066,7 +1157,13 @@ async def backfill_bars(
https://github.com/pikers/piker/issues/128
"""
(first_bars, bars_array, next_dt), fails = await get_bars(sym)
out, fails = await get_bars(sym)
if out is None:
raise RuntimeError("Could not pull currrent history?!")
(first_bars, bars_array, next_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# write historical data to buffer
shm.push(bars_array)
@ -1091,6 +1188,17 @@ async def backfill_bars(
continue
bars, bars_array, next_dt = out
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# TODO we should probably dig into forums to see what peeps
# think this data "means" and then use it as an indicator of
# sorts? dinkus has mentioned that $vlms for the day dont'
# match other platforms nor the summary stat tws shows in
# the monitor - it's probably worth investigating.
shm.push(bars_array, prepend=True)
i += 1
@ -1118,14 +1226,21 @@ _quote_streams: dict[str, trio.abc.ReceiveStream] = {}
async def _setup_quote_stream(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
symbol: str,
opts: tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None,
) -> None:
) -> trio.abc.ReceiveChannel:
"""Stream a ticker using the std L1 api.
"""
global _quote_streams
to_trio.send_nowait(None)
async with load_aio_clients() as (
client,
clients,
@ -1134,23 +1249,10 @@ async def _setup_quote_stream(
contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def push(t):
"""Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
# # define a simple queue push routine that streams quote packets
# # to trio over the ``to_trio`` memory channel.
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
client.ib.cancelMktData(contract)
@ -1158,41 +1260,73 @@ async def _setup_quote_stream(
# decouple broadcast mem chan
_quote_streams.pop(symbol, None)
def push(t: Ticker) -> None:
"""
Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except (
trio.BrokenResourceError,
# XXX: HACK, not sure why this gets left stale (probably
# due to our terrible ``tractor.to_asyncio``
# implementation for streams.. but if the mem chan
# gets left here and starts blocking just kill the feed?
# trio.WouldBlock,
):
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
teardown()
# except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt
# # with log msgs
# pass
ticker.updateEvent.connect(push)
try:
await asyncio.sleep(float('inf'))
finally:
teardown()
return from_aio
# return from_aio
async def start_aio_quote_stream(
@acm
async def open_aio_quote_stream(
symbol: str,
contract: Optional[Contract] = None,
) -> trio.abc.ReceiveStream:
from tractor.trionics import broadcast_receiver
global _quote_streams
from_aio = _quote_streams.get(symbol)
if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer
return from_aio.clone()
async with broadcast_receiver(from_aio) as from_aio:
yield from_aio
return
else:
from_aio = await tractor.to_asyncio.run_task(
async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream,
symbol=symbol,
contract=contract,
)
) as (first, from_aio):
# cache feed for later consumers
_quote_streams[symbol] = from_aio
return from_aio
yield from_aio
async def stream_quotes(
@ -1221,7 +1355,10 @@ async def stream_quotes(
symbol=sym,
)
stream = await start_aio_quote_stream(symbol=sym, contract=contract)
# stream = await start_aio_quote_stream(symbol=sym, contract=contract)
async with open_aio_quote_stream(
symbol=sym, contract=contract
) as stream:
# pass back some symbol info like min_tick, trading_hours, etc.
syminfo = asdict(details)
@ -1286,6 +1423,7 @@ async def stream_quotes(
task_status.started((init_msgs, first_quote))
async with aclosing(stream):
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
# suffix = 'exchange'
# calc_price = False # should be real volume for contract
@ -1298,7 +1436,8 @@ async def stream_quotes(
# for a real volume contract we rait for the first
# "real" trade to take place
if not calc_price and not ticker.rtTime:
# spin consuming tickers until we get a real market datum
# spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
@ -1315,7 +1454,6 @@ async def stream_quotes(
feed_is_live.set()
# last = time.time()
async with aclosing(stream):
async for ticker in stream:
# print(f'ticker rate: {1/(time.time() - last)}')
@ -1472,13 +1610,24 @@ async def trades_dialogue(
accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
async with trio.open_nursery() as nurse:
for account, client in _accounts2clients.items():
# each client to an api endpoint will have it's own event stream
trade_event_stream = await _trio_run_client_method(
method='recv_trade_updates',
async def open_stream(
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
)
) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
trade_event_stream = await nurse.start(open_stream)
clients.append((client, trade_event_stream))
for client in _client_cache.values():
@ -1488,7 +1637,10 @@ async def trades_dialogue(
accounts.add(msg.account)
all_positions.append(msg.dict())
await ctx.started((all_positions, accounts))
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
async with (
ctx.open_stream() as ems_stream,

View File

@ -25,7 +25,7 @@ from dataclasses import dataclass, field
import trio
import tractor
from tractor._broadcast import broadcast_receiver
from tractor.trionics import broadcast_receiver
from ..data._source import Symbol
from ..log import get_logger

View File

@ -868,7 +868,9 @@ async def display_symbol_data(
)
async with (
maybe_open_vlm_display(linkedsplits, ohlcv),
# XXX: this slipped in during a commits refacotr,
# it's actually landing proper in #231
# maybe_open_vlm_display(linkedsplits, ohlcv),
open_order_mode(
feed,

View File

@ -0,0 +1,61 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
IB api client data feed reset hack for i3.
'''
import subprocess
import i3ipc
i3 = i3ipc.Connection()
t = i3.get_tree()
# for tws
win_names: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway.', # gw running in i3
]
for name in win_names:
results = t.find_named(name)
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,
# move mouse to bottom left of window (where there should
# be nothing to click).
'mousemove_relative', '--sync', str(w-3), str(h-3),
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id, '1',
# hackzorzes
'key', 'ctrl+alt+f',
])